From 30b9eba4279d26810d419bddbe9fc779ff3bc413 Mon Sep 17 00:00:00 2001 From: Mick Grove Date: Wed, 29 Apr 2026 22:50:31 -0700 Subject: [PATCH] copilot fixes --- CHANGELOG.md | 4 + .../kingfisher-rules/data/rules/airbrake.yml | 4 + crates/kingfisher-rules/data/rules/fixer.yml | 6 +- .../kingfisher-rules/data/rules/railway.yml | 5 +- src/cli/commands/inputs.rs | 18 ++ src/direct_revoke.rs | 24 +- src/main.rs | 7 + src/parser/lexer.rs | 34 ++- src/reporter/json_format.rs | 6 +- src/scanner/repos.rs | 130 ++++---- src/scanner/runner.rs | 277 ++++++++++++++---- tests/int_allowlist.rs | 1 + tests/int_bitbucket.rs | 3 +- tests/int_dedup.rs | 1 + tests/int_github.rs | 3 +- tests/int_gitlab.rs | 6 +- tests/int_postman.rs | 11 +- tests/int_redact.rs | 11 +- tests/int_slack.rs | 11 +- tests/int_teams.rs | 11 +- tests/int_validation_cache.rs | 11 +- tests/int_vulnerable_files.rs | 1 + 22 files changed, 442 insertions(+), 143 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d643816..4f154e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file. ## [v1.98.0] +- Bounded disk usage for large multi-repo scans (e.g. `--include-contributors --repo-artifacts` against orgs with thousands of repos): cloning, artifact fetching, and scanning now run concurrently through bounded channels, and each cloned repo is removed from the temp directory as soon as its scan completes. On-disk footprint stays roughly `O(num_jobs)` regardless of total repo count instead of growing without bound. `--keep-clones` and `--git-clone-dir` opt out of the per-repo cleanup as before. +- Parallelized `--repo-artifacts` fetching with `buffer_unordered(num_jobs)` so issue/PR/wiki API calls run concurrently and stream into the scan loop, replacing the previous per-repo serial loop that delayed the start of scanning by hours on large fan-outs. +- Streamed `--format json` output as compact one-envelope-per-line so concatenated per-repo emits from the parallel scan path produce valid JSONL that `kingfisher view` can load. Pipe through `jq .` for pretty-printed output. +- Fixed a panic in the lexer when a string literal ends in a trailing backslash (`'... \`); the escape handling now clamps past-EOF so `extract_literal_values` returns instead of slicing out of bounds. - Added first-class **Postman** scanning target: new `kingfisher scan postman` subcommand (and equivalent `--postman-*` flags) fetches workspaces, collections, and environments via the Postman API and scans them for hard-coded credentials in request `auth` blocks, pre-request/test scripts, saved example responses, and — notably — `secret`-typed environment variables, which the API returns in plaintext despite the UI mask. Selectors: `--workspace`, `--collection`, `--environment`, `--all`, with optional `--include-mocks-monitors` and `--api-url` for self-hosted endpoints. Authenticates via `KF_POSTMAN_TOKEN` (or `POSTMAN_API_KEY`) sent as `X-Api-Key`; honors `X-RateLimit-RetryAfter` on 429s. Findings link back to `https://go.postman.co/...` URLs in reports. - Fixed [#359](https://github.com/mongodb/kingfisher/issues/359): added `kingfisher.github.9` to detect the new ~520-character stateless GitHub App installation token format (`ghs__`). The legacy 36-character `ghs_` rule (`kingfisher.github.5`) is retained for older / GHES-issued tokens that are still in circulation. - Added provider endpoint overrides for validation and revocation via global `--endpoint PROVIDER=URL` and `--endpoint-config FILE`, with built-in support for self-hosted GitHub, GitLab, Gitea, Jira, Confluence, and Artifactory instances. diff --git a/crates/kingfisher-rules/data/rules/airbrake.yml b/crates/kingfisher-rules/data/rules/airbrake.yml index f261ffb..3c27ac4 100644 --- a/crates/kingfisher-rules/data/rules/airbrake.yml +++ b/crates/kingfisher-rules/data/rules/airbrake.yml @@ -36,4 +36,8 @@ rules: - words: - '"id"' type: WordMatch + - type: Word + match_words: + - '"type":"Unauthorized"' + negative: true url: https://api.airbrake.io/api/v4/projects?key={{ TOKEN }} \ No newline at end of file diff --git a/crates/kingfisher-rules/data/rules/fixer.yml b/crates/kingfisher-rules/data/rules/fixer.yml index 3c309b1..77429ea 100644 --- a/crates/kingfisher-rules/data/rules/fixer.yml +++ b/crates/kingfisher-rules/data/rules/fixer.yml @@ -35,7 +35,7 @@ rules: - report_response: true - type: StatusMatch status: [200] - - type: StatusMatch - status: [401] - negative: true + - type: WordMatch + words: + - '"success":true' - type: JsonValid diff --git a/crates/kingfisher-rules/data/rules/railway.yml b/crates/kingfisher-rules/data/rules/railway.yml index 2392d69..3fcdab2 100644 --- a/crates/kingfisher-rules/data/rules/railway.yml +++ b/crates/kingfisher-rules/data/rules/railway.yml @@ -39,6 +39,5 @@ rules: status: [200] - type: WordMatch words: - - '"data"' - - '"me"' - match_all_words: true + - '"Not Authorized"' + negative: true diff --git a/src/cli/commands/inputs.rs b/src/cli/commands/inputs.rs index 263b24d..8abc316 100644 --- a/src/cli/commands/inputs.rs +++ b/src/cli/commands/inputs.rs @@ -476,6 +476,24 @@ impl InputSpecifierArgs { || !self.docker_image.is_empty() } + /// Return true when any flag has been set that schedules artifact + /// fetching (issues/PRs/wikis, Jira, Confluence, Slack, Teams, Postman, + /// or Docker images). Used by the scan runner to decide whether the + /// "no inputs" guard should bail out, since artifact dirs arrive via + /// the streaming scan channel rather than via `input_roots`. + pub fn has_artifact_sources(&self) -> bool { + self.repo_artifacts + || self.jira_url.is_some() + || self.confluence_url.is_some() + || self.slack_query.is_some() + || self.teams_query.is_some() + || !self.postman_workspaces.is_empty() + || !self.postman_collections.is_empty() + || !self.postman_environments.is_empty() + || self.postman_all + || !self.docker_image.is_empty() + } + /// Emit deprecation warnings for legacy top-level provider flags. pub fn emit_deprecated_warnings(&self, used_legacy_git_url_flag: bool) { if used_legacy_git_url_flag { diff --git a/src/direct_revoke.rs b/src/direct_revoke.rs index f8847dc..ee65793 100644 --- a/src/direct_revoke.rs +++ b/src/direct_revoke.rs @@ -264,8 +264,9 @@ fn extract_value_from_response( serde_json::from_str(body).context("Response body is not valid JSON")?; // Simple JSONPath implementation supporting basic paths like: - // $.field, $.field.nested, $.array[0], $.array[0].field - let path_parts: Vec<&str> = path.trim_start_matches("$.").split('.').collect(); + // $.field, $.field.nested, $.array[0], $.array[0].field, $[0], $[0].field + let normalized = path.trim_start_matches('$').trim_start_matches('.'); + let path_parts: Vec<&str> = normalized.split('.').collect(); let mut current = &json; for part in path_parts { @@ -838,6 +839,25 @@ mod tests { assert_eq!(result.unwrap(), "b"); } + #[test] + fn jsonpath_root_array_index_field() { + // Used by kingfisher.jira.3 revocation: GET /rest/pat/latest/tokens + // returns a JSON array at the document root, and the rule extracts + // JIRA_TOKEN_ID with path "$[0].id". + let ext = ResponseExtractor::JsonPath { path: "$[0].id".into() }; + let body = r#"[{"id":278,"name":"ITSYSENG-8330"}]"#; + let result = extract_value_from_response(&ext, body, &HeaderMap::new(), &StatusCode::OK); + assert_eq!(result.unwrap(), "278"); + } + + #[test] + fn jsonpath_root_array_index_scalar() { + let ext = ResponseExtractor::JsonPath { path: "$[1]".into() }; + let body = r#"["a","b","c"]"#; + let result = extract_value_from_response(&ext, body, &HeaderMap::new(), &StatusCode::OK); + assert_eq!(result.unwrap(), "b"); + } + #[test] fn jsonpath_missing_top_level_field() { let ext = ResponseExtractor::JsonPath { path: "$.nonexistent".into() }; diff --git a/src/main.rs b/src/main.rs index 759ef09..b0cd3ca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -299,6 +299,12 @@ async fn async_main(args: CommandLineArgs) -> Result<()> { }; let keep_clones = scan_args.input_specifier_args.keep_clones && scan_args.input_specifier_args.git_clone_dir.is_none(); + // When clones go into the temp dir and the user hasn't asked to + // keep them, delete each clone as soon as it has been scanned so + // disk usage stays bounded for very large fan-outs (e.g. + // --include-contributors expanding to thousands of repos). + let auto_cleanup_clones = !scan_args.input_specifier_args.keep_clones + && scan_args.input_specifier_args.git_clone_dir.is_none(); let datastore = Arc::new(Mutex::new(FindingsStore::new(clone_dir))); info!( @@ -326,6 +332,7 @@ async fn async_main(args: CommandLineArgs) -> Result<()> { &rules_db, Arc::clone(&datastore), &update_status, + auto_cleanup_clones, ) .await?; if update_status.is_outdated { diff --git a/src/parser/lexer.rs b/src/parser/lexer.rs index 1106323..b006c0d 100644 --- a/src/parser/lexer.rs +++ b/src/parser/lexer.rs @@ -636,7 +636,10 @@ fn extract_literal_values(input: &str, allow_bare: bool) -> Vec { idx += 1; while idx < bytes.len() { if bytes[idx] == b'\\' && quote != b'`' { - idx += 2; + // Skip the escape byte and the byte that follows; + // clamp so a trailing backslash can't push idx past + // the end of the input. + idx = (idx + 2).min(bytes.len()); continue; } if bytes[idx] == quote { @@ -645,7 +648,8 @@ fn extract_literal_values(input: &str, allow_bare: bool) -> Vec { } idx += 1; } - values.push(input[start..idx].to_string()); + let end = idx.min(bytes.len()); + values.push(input[start..end].to_string()); } b'[' | b'(' | b'{' => { let (close, start) = match bytes[idx] { @@ -663,7 +667,9 @@ fn extract_literal_values(input: &str, allow_bare: bool) -> Vec { idx += 1; while idx < bytes.len() { if bytes[idx] == b'\\' && quote != b'`' { - idx += 2; + // Clamp so a trailing backslash inside a + // bracketed string can't escape past EOF. + idx = (idx + 2).min(bytes.len()); continue; } if bytes[idx] == quote { @@ -1061,6 +1067,28 @@ mod tests { assert_eq!(vals, vec![r#""he said \"hi\"""#]); } + #[test] + fn extract_literals_trailing_backslash_does_not_panic() { + // Regression: a string literal ending in a lone backslash used to + // push idx past bytes.len() and panic at `input[start..idx]`. + // The unterminated literal should be returned as-is (without the + // closing quote that doesn't exist) and contain the trailing + // backslash. + let single = extract_literal_values("'foo \\", false); + assert_eq!(single, vec!["'foo \\"]); + + let double = extract_literal_values("\"foo \\", false); + assert_eq!(double, vec!["\"foo \\"]); + + // The bracketed form should also recurse into the unterminated + // string without panicking; the result may be empty if no inner + // values were closed, but the call must return. + let bracketed = extract_literal_values("['foo \\']", false); + // Only assert non-panic; exact shape depends on bracket matching + // around an unterminated string. + let _ = bracketed; + } + #[test] fn extract_literals_numbers() { let vals = extract_literal_values("42, -3.14, +1", false); diff --git a/src/reporter/json_format.rs b/src/reporter/json_format.rs index 71bb599..8843fdc 100644 --- a/src/reporter/json_format.rs +++ b/src/reporter/json_format.rs @@ -8,7 +8,11 @@ impl DetailsReporter { ) -> Result<()> { let envelope = self.build_report_envelope(args)?; if !envelope.findings.is_empty() || envelope.access_map.is_some() { - serde_json::to_writer_pretty(&mut writer, &envelope)?; + // Compact one-envelope-per-line so streaming emits (parallel + // scan path: one envelope per repo) concatenate into valid + // JSONL that `kingfisher view` can parse. Pipe through `jq .` + // for human-readable pretty output. + serde_json::to_writer(&mut writer, &envelope)?; writeln!(writer)?; } Ok(()) diff --git a/src/scanner/repos.rs b/src/scanner/repos.rs index f38f656..319cbb7 100644 --- a/src/scanner/repos.rs +++ b/src/scanner/repos.rs @@ -113,8 +113,13 @@ where ProgressBar::hidden() }; - let (ready_tx, ready_rx) = crossbeam_channel::unbounded(); let clone_concurrency = std::cmp::max(1, args.num_jobs); + // Bound this internal channel so cloner workers block on `send` when the + // single-threaded dispatcher loop below can't forward fast enough (e.g. + // because the outer bounded scan channel is full). Without this bound, + // cloners would race ahead and fill `/tmp` with completed-but-unscanned + // clones, which is exactly the disk-overflow bug we're trying to fix. + let (ready_tx, ready_rx) = crossbeam_channel::bounded(std::cmp::max(2, clone_concurrency * 2)); let ignore_certs = global_args.ignore_certs; ThreadPoolBuilder::new() @@ -813,6 +818,10 @@ pub async fn fetch_teams_messages( Ok(vec![output_dir]) } +/// Streams per-repo artifact directories (issues/PRs/wikis) into `out_tx` +/// as soon as each one is fetched, rather than collecting all results before +/// returning. This lets the scan loop start consuming artifact dirs while +/// remote fetches for other repos are still in flight. pub async fn fetch_git_host_artifacts( repo_urls: &[GitUrl], bitbucket_api_url: &Url, @@ -820,67 +829,76 @@ pub async fn fetch_git_host_artifacts( bitbucket_host: Option, global_args: &global::GlobalArgs, datastore: &Arc>, -) -> Result> { + concurrency: usize, + out_tx: crossbeam_channel::Sender, +) -> Result<()> { + use futures::stream::{self, StreamExt}; + let output_root = { let ds = datastore.lock().unwrap(); ds.clone_root() }; - let mut dirs = Vec::new(); - for repo_url in repo_urls { - let host = Url::parse(repo_url.as_str()) - .ok() - .and_then(|u| u.host_str().map(|s| s.to_string())) - .unwrap_or_default(); - if host.contains("github") { - dirs.extend( - github::fetch_repo_items( - repo_url, - global_args.ignore_certs, - &output_root, - datastore, - ) - .await?, - ); - } else if host.contains("gitlab") { - dirs.extend( - gitlab::fetch_repo_items( - repo_url, - global_args.ignore_certs, - &output_root, - datastore, - ) - .await?, - ); - } else if host.contains("bitbucket") - || bitbucket_host - .as_deref() - .map(|expected| expected.eq_ignore_ascii_case(&host)) - .unwrap_or(false) - { - dirs.extend( - bitbucket::fetch_repo_items( - repo_url, - bitbucket_api_url, - bitbucket_auth, - global_args.ignore_certs, - &output_root, - datastore, - ) - .await?, - ); - } else if host.contains("dev.azure") || host.contains("visualstudio.com") { - dirs.extend( - azure::fetch_repo_items( - repo_url, - global_args.ignore_certs, - &output_root, - datastore, - ) - .await?, - ); + let concurrency = std::cmp::max(1, concurrency); + + // Fan out per-repo artifact fetches concurrently. Each completed fetch + // pushes its dirs into `out_tx` immediately so the scanner can pick them + // up while the remaining fetches are still running. + let mut stream = stream::iter(repo_urls.iter().cloned()) + .map(|repo_url| { + let bitbucket_api_url = bitbucket_api_url.clone(); + let bitbucket_auth = bitbucket_auth.clone(); + let bitbucket_host = bitbucket_host.clone(); + let output_root = output_root.clone(); + let datastore = Arc::clone(datastore); + let ignore_certs = global_args.ignore_certs; + async move { + let host = Url::parse(repo_url.as_str()) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + if host.contains("github") { + github::fetch_repo_items(&repo_url, ignore_certs, &output_root, &datastore) + .await + } else if host.contains("gitlab") { + gitlab::fetch_repo_items(&repo_url, ignore_certs, &output_root, &datastore) + .await + } else if host.contains("bitbucket") + || bitbucket_host + .as_deref() + .map(|expected| expected.eq_ignore_ascii_case(&host)) + .unwrap_or(false) + { + bitbucket::fetch_repo_items( + &repo_url, + &bitbucket_api_url, + &bitbucket_auth, + ignore_certs, + &output_root, + &datastore, + ) + .await + } else if host.contains("dev.azure") || host.contains("visualstudio.com") { + azure::fetch_repo_items(&repo_url, ignore_certs, &output_root, &datastore).await + } else { + Ok(Vec::new()) + } + } + }) + .buffer_unordered(concurrency); + + while let Some(result) = stream.next().await { + let dirs = result?; + for d in dirs { + // Send may block if the bounded channel is full; that's the + // intended backpressure. Errors only occur if the receiver has + // been dropped (scan aborted), in which case we stop producing. + if out_tx.send(d).is_err() { + debug!("scan channel closed; stopping git-host artifact fetcher"); + return Ok(()); + } } } - Ok(dirs) + Ok(()) } pub async fn fetch_s3_objects( diff --git a/src/scanner/runner.rs b/src/scanner/runner.rs index b9d744f..e7828c7 100644 --- a/src/scanner/runner.rs +++ b/src/scanner/runner.rs @@ -63,10 +63,18 @@ pub async fn run_scan( rules_db: &RulesDatabase, datastore: Arc>, update_status: &crate::update::UpdateStatus, + auto_cleanup_clones: bool, ) -> Result<()> { - run_async_scan(global_args, scan_args, Arc::clone(&datastore), rules_db, update_status) - .await - .context("Failed to run scan command") + run_async_scan( + global_args, + scan_args, + Arc::clone(&datastore), + rules_db, + update_status, + auto_cleanup_clones, + ) + .await + .context("Failed to run scan command") } pub async fn run_async_scan( @@ -75,6 +83,7 @@ pub async fn run_async_scan( datastore: Arc>, rules_db: &RulesDatabase, update_status: &crate::update::UpdateStatus, + auto_cleanup_clones: bool, ) -> Result<()> { // ── Phase 1: Input validation and environment setup ────────────────── validate_inputs(args)?; @@ -93,20 +102,37 @@ pub async fn run_async_scan( let repo_urls = enumerate_all_repos(args, global_args).await?; let mut input_roots = args.input_specifier_args.path_inputs.clone(); - let (repo_tx, repo_rx) = crossbeam_channel::unbounded(); - let repo_clone_handle = - start_repo_cloning(&repo_urls, args, global_args, &datastore, repo_tx, progress_enabled); + // Bound the channel feeding the scan loop. Both the cloner pool and the + // artifact-fetching task push into this channel; bounding it caps how + // many cloned-but-unscanned repos sit on disk while the scanner catches + // up. Combined with the inner cloner→dispatcher channel (also + // 2*num_jobs) and the per-repo cleanup after scan, the worst-case + // on-disk count is roughly 6*num_jobs (inner queue + outer queue + + // active cloners + active scans), i.e. O(num_jobs). + let scan_channel_cap = std::cmp::max(2, args.num_jobs * 2); + let (repo_tx, repo_rx) = crossbeam_channel::bounded(scan_channel_cap); - // ── Phase 3: Artifact fetching ────────────────────────────────────── - fetch_all_artifacts( + // ── Phase 3: Spawn cloning + artifact-fetching concurrently ───────── + // The scan loop will start consuming from `repo_rx` as soon as we get + // there in Phase 5; both producers feed it as their work completes. + let repo_clone_handle = start_repo_cloning( + &repo_urls, + args, + global_args, + &datastore, + repo_tx.clone(), + progress_enabled, + ); + let artifact_handle = start_artifact_fetching( args, global_args, &repo_urls, &datastore, - &mut input_roots, + repo_tx.clone(), progress_enabled, - ) - .await?; + ); + // Drop the local sender so the channel closes once all producers finish. + drop(repo_tx); // ── Phase 4: Scan configuration ───────────────────────────────────── let shared_profiler = Arc::new(ConcurrentRuleProfiler::new()); @@ -138,7 +164,15 @@ pub async fn run_async_scan( let has_remote_objects = args.input_specifier_args.s3_bucket.is_some() || args.input_specifier_args.gcs_bucket.is_some(); - if input_roots.is_empty() && repo_urls.is_empty() && !has_remote_objects { + // The artifact task pushes into `repo_rx` asynchronously, so we can't + // observe its work via `input_roots`. Defer to the type to know which + // flags schedule artifact fetching so this stays in sync as new sources + // are added. + if input_roots.is_empty() + && repo_urls.is_empty() + && !has_remote_objects + && !args.input_specifier_args.has_artifact_sources() + { bail!("No inputs to scan"); } @@ -190,6 +224,7 @@ pub async fn run_async_scan( &mut input_roots, repo_rx, repo_clone_handle, + artifact_handle, &shared_profiler, enable_profiling, &matcher_stats, @@ -200,6 +235,7 @@ pub async fn run_async_scan( start_time, scan_started_at, update_status, + auto_cleanup_clones, ) .await?; return Ok(()); @@ -213,6 +249,7 @@ pub async fn run_async_scan( &repo_roots, repo_rx, repo_clone_handle, + artifact_handle, &shared_profiler, enable_profiling, &matcher_stats, @@ -223,6 +260,7 @@ pub async fn run_async_scan( start_time, scan_started_at, update_status, + auto_cleanup_clones, ) .await } @@ -338,53 +376,123 @@ fn start_repo_cloning( Some(handle) } -/// Fetches artifacts from various platforms (issues, wikis, Jira, Confluence, Slack, Docker). +/// Spawns a dedicated thread (with its own multi-threaded tokio runtime) +/// that streams artifact directories into `out_tx` as each fetch completes. +/// Decoupling from the parent runtime ensures the artifact task can make +/// progress regardless of how the parent runtime is configured (including +/// `#[tokio::test]`'s default single-threaded runtime), while the scan +/// loops on the parent thread block on sync `repo_rx.iter()`. +/// +/// # Panics +/// +/// Panics if the OS refuses to spawn the worker thread (e.g. resource +/// exhaustion). This is treated as unrecoverable on the main scan path +/// because every other concurrent component would face the same limit. +fn start_artifact_fetching( + args: &scan::ScanArgs, + global_args: &global::GlobalArgs, + repo_urls: &[crate::git_url::GitUrl], + datastore: &Arc>, + out_tx: crossbeam_channel::Sender, + progress_enabled: bool, +) -> std::thread::JoinHandle> { + let args = args.clone(); + let global_args = global_args.clone(); + let repo_urls = repo_urls.to_vec(); + let datastore = Arc::clone(datastore); + std::thread::Builder::new() + .name("artifact-fetcher".to_string()) + .spawn(move || -> Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(args.num_jobs.max(1)) + .enable_all() + .build() + .context("Failed to build artifact-fetcher runtime")?; + rt.block_on(fetch_all_artifacts( + &args, + &global_args, + &repo_urls, + &datastore, + out_tx, + progress_enabled, + )) + }) + .expect("failed to spawn artifact-fetcher thread") +} + +/// Fetches artifacts from various platforms (issues, wikis, Jira, Confluence, +/// Slack, Docker) and streams each produced directory into `out_tx` as soon +/// as it is ready, so the scan loop can process them concurrently with +/// further fetches and with cloning. Returns when all sources are exhausted +/// or when the receiver has been dropped (scan aborted). async fn fetch_all_artifacts( args: &scan::ScanArgs, global_args: &global::GlobalArgs, repo_urls: &[crate::git_url::GitUrl], datastore: &Arc>, - input_roots: &mut Vec, + out_tx: crossbeam_channel::Sender, progress_enabled: bool, ) -> Result<()> { let bitbucket_auth = bitbucket::AuthConfig::from_env(); let bitbucket_host = args.input_specifier_args.bitbucket_api_url.host_str().map(|s| s.to_string()); + let push = |dir: PathBuf, tx: &crossbeam_channel::Sender| -> bool { + // send blocks on bounded channel (intended backpressure); errors + // only happen if all receivers have been dropped (scan aborted). + match tx.send(dir) { + Ok(()) => true, + Err(_) => { + debug!("scan channel closed; stopping artifact fetcher"); + false + } + } + }; + if args.input_specifier_args.repo_artifacts { - let repo_artifact_dirs = fetch_git_host_artifacts( + fetch_git_host_artifacts( repo_urls, &args.input_specifier_args.bitbucket_api_url, &bitbucket_auth, bitbucket_host.clone(), global_args, datastore, + args.num_jobs, + out_tx.clone(), ) .await?; - input_roots.extend(repo_artifact_dirs); } - // Fetch Jira issues if requested - let jira_dirs = fetch_jira_issues(args, global_args, datastore).await?; - input_roots.extend(jira_dirs); + for d in fetch_jira_issues(args, global_args, datastore).await? { + if !push(d, &out_tx) { + return Ok(()); + } + } - // Fetch Confluence pages if requested - let confluence_dirs = fetch_confluence_pages(args, global_args, datastore).await?; - input_roots.extend(confluence_dirs); + for d in fetch_confluence_pages(args, global_args, datastore).await? { + if !push(d, &out_tx) { + return Ok(()); + } + } - // Fetch Slack messages if requested - let slack_dirs = fetch_slack_messages(args, global_args, datastore).await?; - input_roots.extend(slack_dirs); + for d in fetch_slack_messages(args, global_args, datastore).await? { + if !push(d, &out_tx) { + return Ok(()); + } + } - // Fetch Teams messages if requested - let teams_dirs = fetch_teams_messages(args, global_args, datastore).await?; - input_roots.extend(teams_dirs); + for d in fetch_teams_messages(args, global_args, datastore).await? { + if !push(d, &out_tx) { + return Ok(()); + } + } - // Fetch Postman resources if requested - let postman_dirs = fetch_postman_resources(args, global_args, datastore).await?; - input_roots.extend(postman_dirs); + for d in fetch_postman_resources(args, global_args, datastore).await? { + if !push(d, &out_tx) { + return Ok(()); + } + } - // Save Docker images if specified if !args.input_specifier_args.docker_image.is_empty() { let clone_root = { let ds = datastore.lock().unwrap(); @@ -401,7 +509,9 @@ async fn fetch_all_artifacts( let mut ds = datastore.lock().unwrap(); ds.register_docker_image(dir.clone(), img); } - input_roots.push(dir); + if !push(dir, &out_tx) { + return Ok(()); + } } } @@ -560,6 +670,7 @@ async fn run_sequential_scan( input_roots: &mut Vec, repo_rx: crossbeam_channel::Receiver, repo_clone_handle: Option>, + artifact_handle: std::thread::JoinHandle>, shared_profiler: &Arc, enable_profiling: bool, matcher_stats: &Arc>, @@ -570,39 +681,60 @@ async fn run_sequential_scan( start_time: Instant, scan_started_at: chrono::DateTime, update_status: &crate::update::UpdateStatus, + auto_cleanup_clones: bool, ) -> Result<()> { let mut streamed_roots = Vec::new(); - if !input_roots.is_empty() { - let _inputs = enumerate_filesystem_inputs( - args, - datastore.clone(), - input_roots, - progress_enabled, - rules_db, - enable_profiling, - Arc::clone(shared_profiler), - matcher_stats.as_ref(), - )?; - } + // Run the scan loop in a closure so that, even if a per-repo + // `enumerate_filesystem_inputs` returns Err and short-circuits via `?`, + // we still drop `repo_rx` and join the cloning + artifact-fetching + // threads before returning. Without this, the producer threads would + // continue cloning into `/tmp` after the scan has already failed. + let scan_result: Result<()> = (|| { + if !input_roots.is_empty() { + let _inputs = enumerate_filesystem_inputs( + args, + datastore.clone(), + input_roots, + progress_enabled, + rules_db, + enable_profiling, + Arc::clone(shared_profiler), + matcher_stats.as_ref(), + )?; + } - for repo_root in repo_rx.iter() { - enumerate_filesystem_inputs( - args, - datastore.clone(), - &[repo_root.clone()], - progress_enabled, - rules_db, - enable_profiling, - Arc::clone(shared_profiler), - matcher_stats.as_ref(), - )?; - streamed_roots.push(repo_root); - } + for repo_root in repo_rx.iter() { + enumerate_filesystem_inputs( + args, + datastore.clone(), + &[repo_root.clone()], + progress_enabled, + rules_db, + enable_profiling, + Arc::clone(shared_profiler), + matcher_stats.as_ref(), + )?; + if auto_cleanup_clones && let Err(e) = fs::remove_dir_all(&repo_root) { + debug!("Failed to remove scanned clone {}: {e}", repo_root.display()); + } + streamed_roots.push(repo_root); + } + Ok(()) + })(); input_roots.extend(streamed_roots); if let Some(handle) = repo_clone_handle { let _ = handle.join(); } + let artifact_result = match artifact_handle.join() { + Ok(r) => r, + Err(_) => Err(anyhow::anyhow!("artifact fetch thread panicked")), + }; + + // Surface the scan error first; if scanning succeeded, surface any + // artifact-fetching error. + scan_result?; + artifact_result.map_err(|e| e.context("artifact fetching failed"))?; deduplicate_new_matches(datastore, global_args, args, 0)?; apply_baseline_if_configured(args, datastore, baseline_path.as_ref(), input_roots)?; @@ -655,6 +787,7 @@ async fn run_parallel_scan( repo_roots: &[PathBuf], repo_rx: crossbeam_channel::Receiver, repo_clone_handle: Option>, + artifact_handle: std::thread::JoinHandle>, shared_profiler: &Arc, enable_profiling: bool, matcher_stats: &Arc>, @@ -665,6 +798,7 @@ async fn run_parallel_scan( start_time: Instant, scan_started_at: chrono::DateTime, update_status: &crate::update::UpdateStatus, + auto_cleanup_clones: bool, ) -> Result<()> { deduplicate_new_matches(datastore, global_args, args, 0)?; apply_baseline_if_configured(args, datastore, baseline_path.as_ref(), repo_roots)?; @@ -709,7 +843,15 @@ async fn run_parallel_scan( .build() .context("Failed to build repo scan thread pool")? .scope(|scope| { - let spawn_repo_scan = |root: PathBuf| { + // Distinguishes user-supplied `repo_roots` (must be preserved) + // from clones / artifact dirs that arrive via `repo_rx` and + // are eligible for post-scan cleanup. + #[derive(Clone, Copy)] + enum ScanRootSource { + UserPath, + Streamed, + } + let spawn_repo_scan = |root: PathBuf, source: ScanRootSource| { let repo_rules = repo_rules.clone(); let base_clone_root = base_clone_root.clone(); let baseline_path = Arc::clone(baseline_path); @@ -815,21 +957,34 @@ async fn run_parallel_scan( error!("Repository scan failed: {e}"); repo_errors.lock().unwrap().push(e); } + + if matches!(source, ScanRootSource::Streamed) + && auto_cleanup_clones + && let Err(e) = fs::remove_dir_all(&root) + { + debug!("Failed to remove scanned clone {}: {e}", root.display()); + } }); }; for root in repo_roots.iter().cloned() { - spawn_repo_scan(root); + spawn_repo_scan(root, ScanRootSource::UserPath); } for root in repo_rx.iter() { - spawn_repo_scan(root); + spawn_repo_scan(root, ScanRootSource::Streamed); } }); if let Some(handle) = repo_clone_handle { let _ = handle.join(); } + // Surface artifact-fetching errors after all per-repo scans have finished. + match artifact_handle.join() { + Ok(Ok(())) => {} + Ok(Err(e)) => return Err(e.context("artifact fetching failed")), + Err(_) => return Err(anyhow::anyhow!("artifact fetch thread panicked")), + } if let Some(err) = repo_errors.lock().unwrap().pop() { return Err(err); diff --git a/tests/int_allowlist.rs b/tests/int_allowlist.rs index dd40e40..9217d94 100644 --- a/tests/int_allowlist.rs +++ b/tests/int_allowlist.rs @@ -221,6 +221,7 @@ fn run_skiplist(skip_regex: Vec, skip_skipword: Vec) -> Result Result<()> { let update_status = UpdateStatus::default(); runtime.block_on(async { - run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status).await + run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status, false) + .await })?; let ds = datastore.lock().unwrap(); diff --git a/tests/int_dedup.rs b/tests/int_dedup.rs index 914c17b..ef4a059 100644 --- a/tests/int_dedup.rs +++ b/tests/int_dedup.rs @@ -231,6 +231,7 @@ rules: Arc::clone(&datastore), &rules_db, &update_status, + false, ))?; let x = Ok(datastore.lock().unwrap().get_matches().len()); diff --git a/tests/int_github.rs b/tests/int_github.rs index 14ed62e..09f85ae 100644 --- a/tests/int_github.rs +++ b/tests/int_github.rs @@ -209,7 +209,8 @@ fn test_github_remote_scan() -> Result<()> { let update_status = UpdateStatus::default(); // Run the scan using runtime.block_on runtime.block_on(async { - run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status).await + run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status, false) + .await })?; // Get scan results let ds = datastore.lock().unwrap(); diff --git a/tests/int_gitlab.rs b/tests/int_gitlab.rs index 7b54d46..b2cfead 100644 --- a/tests/int_gitlab.rs +++ b/tests/int_gitlab.rs @@ -206,7 +206,8 @@ fn test_gitlab_remote_scan() -> Result<()> { let update_status = UpdateStatus::default(); rt.block_on(async { - run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status).await + run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status, false) + .await })?; let ds = datastore.lock().unwrap(); @@ -390,7 +391,8 @@ fn test_gitlab_remote_scan_no_history() -> Result<()> { let update_status = UpdateStatus::default(); rt.block_on(async { - run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status).await + run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status, false) + .await })?; let ds = datastore.lock().unwrap(); diff --git a/tests/int_postman.rs b/tests/int_postman.rs index 4c23658..f9fe833 100644 --- a/tests/int_postman.rs +++ b/tests/int_postman.rs @@ -277,8 +277,15 @@ async fn test_scan_postman_all() -> Result<()> { let datastore = Arc::new(Mutex::new(FindingsStore::new(clone_dir))); let update_status = UpdateStatus::default(); - run_async_scan(&global_args, &scan_args, Arc::clone(&datastore), &rules_db, &update_status) - .await?; + run_async_scan( + &global_args, + &scan_args, + Arc::clone(&datastore), + &rules_db, + &update_status, + false, + ) + .await?; let ds = datastore.lock().unwrap(); let findings = ds.get_matches().len(); diff --git a/tests/int_redact.rs b/tests/int_redact.rs index 6353577..a7111c7 100644 --- a/tests/int_redact.rs +++ b/tests/int_redact.rs @@ -182,8 +182,15 @@ async fn test_redact_hashes_finding_values() -> Result<()> { let update_status = UpdateStatus::default(); let datastore = Arc::new(Mutex::new(FindingsStore::new(temp_dir.path().to_path_buf()))); - run_async_scan(&global_args, &scan_args, Arc::clone(&datastore), &rules_db, &update_status) - .await?; + run_async_scan( + &global_args, + &scan_args, + Arc::clone(&datastore), + &rules_db, + &update_status, + false, + ) + .await?; let ds = datastore.lock().unwrap(); let matches = ds.get_matches(); diff --git a/tests/int_slack.rs b/tests/int_slack.rs index 4038177..02dbaa6 100644 --- a/tests/int_slack.rs +++ b/tests/int_slack.rs @@ -352,8 +352,15 @@ async fn test_scan_slack_messages() -> Result<()> { let datastore = Arc::new(Mutex::new(FindingsStore::new(clone_dir))); let update_status = UpdateStatus::default(); - run_async_scan(&global_args, &scan_args, Arc::clone(&datastore), &ctx.rules_db, &update_status) - .await?; + run_async_scan( + &global_args, + &scan_args, + Arc::clone(&datastore), + &ctx.rules_db, + &update_status, + false, + ) + .await?; let findings = { let ds = datastore.lock().unwrap(); diff --git a/tests/int_teams.rs b/tests/int_teams.rs index ea97ed5..7e45cd0 100644 --- a/tests/int_teams.rs +++ b/tests/int_teams.rs @@ -224,8 +224,15 @@ async fn test_scan_teams_messages() -> Result<()> { let datastore = Arc::new(Mutex::new(FindingsStore::new(clone_dir))); let update_status = UpdateStatus::default(); - run_async_scan(&global_args, &scan_args, Arc::clone(&datastore), &rules_db, &update_status) - .await?; + run_async_scan( + &global_args, + &scan_args, + Arc::clone(&datastore), + &rules_db, + &update_status, + false, + ) + .await?; let findings = { let ds = datastore.lock().unwrap(); diff --git a/tests/int_validation_cache.rs b/tests/int_validation_cache.rs index 7ec3071..f456c1c 100644 --- a/tests/int_validation_cache.rs +++ b/tests/int_validation_cache.rs @@ -275,8 +275,15 @@ async fn test_validation_cache_and_depvars() -> Result<()> { }; let update_status = UpdateStatus::default(); - run_async_scan(&global_args, &scan_args, Arc::clone(&datastore), &rules_db, &update_status) - .await?; + run_async_scan( + &global_args, + &scan_args, + Arc::clone(&datastore), + &rules_db, + &update_status, + false, + ) + .await?; /* --------------------------------------------------------- * * 6. Assertions * diff --git a/tests/int_vulnerable_files.rs b/tests/int_vulnerable_files.rs index cbb6878..9b83b68 100644 --- a/tests/int_vulnerable_files.rs +++ b/tests/int_vulnerable_files.rs @@ -362,6 +362,7 @@ impl TestContext { Arc::clone(&datastore), &self.rules_db, &update_status, + false, ) .await?;