forked from mirrors/kingfisher
copilot fixes
This commit is contained in:
parent
ab93d4d242
commit
30b9eba427
22 changed files with 443 additions and 144 deletions
|
|
@ -3,6 +3,10 @@
|
|||
All notable changes to this project will be documented in this file.
|
||||
|
||||
## [v1.98.0]
|
||||
- Bounded disk usage for large multi-repo scans (e.g. `--include-contributors --repo-artifacts` against orgs with thousands of repos): cloning, artifact fetching, and scanning now run concurrently through bounded channels, and each cloned repo is removed from the temp directory as soon as its scan completes. On-disk footprint stays roughly `O(num_jobs)` regardless of total repo count instead of growing without bound. `--keep-clones` and `--git-clone-dir` opt out of the per-repo cleanup as before.
|
||||
- Parallelized `--repo-artifacts` fetching with `buffer_unordered(num_jobs)` so issue/PR/wiki API calls run concurrently and stream into the scan loop, replacing the previous per-repo serial loop that delayed the start of scanning by hours on large fan-outs.
|
||||
- Streamed `--format json` output as compact one-envelope-per-line so concatenated per-repo emits from the parallel scan path produce valid JSONL that `kingfisher view` can load. Pipe through `jq .` for pretty-printed output.
|
||||
- Fixed a panic in the lexer when a string literal ends in a trailing backslash (`'... \`); the escape handling now clamps past-EOF so `extract_literal_values` returns instead of slicing out of bounds.
|
||||
- Added first-class **Postman** scanning target: new `kingfisher scan postman` subcommand (and equivalent `--postman-*` flags) fetches workspaces, collections, and environments via the Postman API and scans them for hard-coded credentials in request `auth` blocks, pre-request/test scripts, saved example responses, and — notably — `secret`-typed environment variables, which the API returns in plaintext despite the UI mask. Selectors: `--workspace`, `--collection`, `--environment`, `--all`, with optional `--include-mocks-monitors` and `--api-url` for self-hosted endpoints. Authenticates via `KF_POSTMAN_TOKEN` (or `POSTMAN_API_KEY`) sent as `X-Api-Key`; honors `X-RateLimit-RetryAfter` on 429s. Findings link back to `https://go.postman.co/...` URLs in reports.
|
||||
- Fixed [#359](https://github.com/mongodb/kingfisher/issues/359): added `kingfisher.github.9` to detect the new ~520-character stateless GitHub App installation token format (`ghs_<APP_ID>_<JWT>`). The legacy 36-character `ghs_` rule (`kingfisher.github.5`) is retained for older / GHES-issued tokens that are still in circulation.
|
||||
- Added provider endpoint overrides for validation and revocation via global `--endpoint PROVIDER=URL` and `--endpoint-config FILE`, with built-in support for self-hosted GitHub, GitLab, Gitea, Jira, Confluence, and Artifactory instances.
|
||||
|
|
|
|||
|
|
@ -36,4 +36,8 @@ rules:
|
|||
- words:
|
||||
- '"id"'
|
||||
type: WordMatch
|
||||
- type: Word
|
||||
match_words:
|
||||
- '"type":"Unauthorized"'
|
||||
negative: true
|
||||
url: https://api.airbrake.io/api/v4/projects?key={{ TOKEN }}
|
||||
|
|
@ -35,7 +35,7 @@ rules:
|
|||
- report_response: true
|
||||
- type: StatusMatch
|
||||
status: [200]
|
||||
- type: StatusMatch
|
||||
status: [401]
|
||||
negative: true
|
||||
- type: WordMatch
|
||||
words:
|
||||
- '"success":true'
|
||||
- type: JsonValid
|
||||
|
|
|
|||
|
|
@ -39,6 +39,5 @@ rules:
|
|||
status: [200]
|
||||
- type: WordMatch
|
||||
words:
|
||||
- '"data"'
|
||||
- '"me"'
|
||||
match_all_words: true
|
||||
- '"Not Authorized"'
|
||||
negative: true
|
||||
|
|
|
|||
|
|
@ -476,6 +476,24 @@ impl InputSpecifierArgs {
|
|||
|| !self.docker_image.is_empty()
|
||||
}
|
||||
|
||||
/// Return true when any flag has been set that schedules artifact
|
||||
/// fetching (issues/PRs/wikis, Jira, Confluence, Slack, Teams, Postman,
|
||||
/// or Docker images). Used by the scan runner to decide whether the
|
||||
/// "no inputs" guard should bail out, since artifact dirs arrive via
|
||||
/// the streaming scan channel rather than via `input_roots`.
|
||||
pub fn has_artifact_sources(&self) -> bool {
|
||||
self.repo_artifacts
|
||||
|| self.jira_url.is_some()
|
||||
|| self.confluence_url.is_some()
|
||||
|| self.slack_query.is_some()
|
||||
|| self.teams_query.is_some()
|
||||
|| !self.postman_workspaces.is_empty()
|
||||
|| !self.postman_collections.is_empty()
|
||||
|| !self.postman_environments.is_empty()
|
||||
|| self.postman_all
|
||||
|| !self.docker_image.is_empty()
|
||||
}
|
||||
|
||||
/// Emit deprecation warnings for legacy top-level provider flags.
|
||||
pub fn emit_deprecated_warnings(&self, used_legacy_git_url_flag: bool) {
|
||||
if used_legacy_git_url_flag {
|
||||
|
|
|
|||
|
|
@ -264,8 +264,9 @@ fn extract_value_from_response(
|
|||
serde_json::from_str(body).context("Response body is not valid JSON")?;
|
||||
|
||||
// Simple JSONPath implementation supporting basic paths like:
|
||||
// $.field, $.field.nested, $.array[0], $.array[0].field
|
||||
let path_parts: Vec<&str> = path.trim_start_matches("$.").split('.').collect();
|
||||
// $.field, $.field.nested, $.array[0], $.array[0].field, $[0], $[0].field
|
||||
let normalized = path.trim_start_matches('$').trim_start_matches('.');
|
||||
let path_parts: Vec<&str> = normalized.split('.').collect();
|
||||
|
||||
let mut current = &json;
|
||||
for part in path_parts {
|
||||
|
|
@ -838,6 +839,25 @@ mod tests {
|
|||
assert_eq!(result.unwrap(), "b");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn jsonpath_root_array_index_field() {
|
||||
// Used by kingfisher.jira.3 revocation: GET /rest/pat/latest/tokens
|
||||
// returns a JSON array at the document root, and the rule extracts
|
||||
// JIRA_TOKEN_ID with path "$[0].id".
|
||||
let ext = ResponseExtractor::JsonPath { path: "$[0].id".into() };
|
||||
let body = r#"[{"id":278,"name":"ITSYSENG-8330"}]"#;
|
||||
let result = extract_value_from_response(&ext, body, &HeaderMap::new(), &StatusCode::OK);
|
||||
assert_eq!(result.unwrap(), "278");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn jsonpath_root_array_index_scalar() {
|
||||
let ext = ResponseExtractor::JsonPath { path: "$[1]".into() };
|
||||
let body = r#"["a","b","c"]"#;
|
||||
let result = extract_value_from_response(&ext, body, &HeaderMap::new(), &StatusCode::OK);
|
||||
assert_eq!(result.unwrap(), "b");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn jsonpath_missing_top_level_field() {
|
||||
let ext = ResponseExtractor::JsonPath { path: "$.nonexistent".into() };
|
||||
|
|
|
|||
|
|
@ -299,6 +299,12 @@ async fn async_main(args: CommandLineArgs) -> Result<()> {
|
|||
};
|
||||
let keep_clones = scan_args.input_specifier_args.keep_clones
|
||||
&& scan_args.input_specifier_args.git_clone_dir.is_none();
|
||||
// When clones go into the temp dir and the user hasn't asked to
|
||||
// keep them, delete each clone as soon as it has been scanned so
|
||||
// disk usage stays bounded for very large fan-outs (e.g.
|
||||
// --include-contributors expanding to thousands of repos).
|
||||
let auto_cleanup_clones = !scan_args.input_specifier_args.keep_clones
|
||||
&& scan_args.input_specifier_args.git_clone_dir.is_none();
|
||||
|
||||
let datastore = Arc::new(Mutex::new(FindingsStore::new(clone_dir)));
|
||||
info!(
|
||||
|
|
@ -326,6 +332,7 @@ async fn async_main(args: CommandLineArgs) -> Result<()> {
|
|||
&rules_db,
|
||||
Arc::clone(&datastore),
|
||||
&update_status,
|
||||
auto_cleanup_clones,
|
||||
)
|
||||
.await?;
|
||||
if update_status.is_outdated {
|
||||
|
|
|
|||
|
|
@ -636,7 +636,10 @@ fn extract_literal_values(input: &str, allow_bare: bool) -> Vec<String> {
|
|||
idx += 1;
|
||||
while idx < bytes.len() {
|
||||
if bytes[idx] == b'\\' && quote != b'`' {
|
||||
idx += 2;
|
||||
// Skip the escape byte and the byte that follows;
|
||||
// clamp so a trailing backslash can't push idx past
|
||||
// the end of the input.
|
||||
idx = (idx + 2).min(bytes.len());
|
||||
continue;
|
||||
}
|
||||
if bytes[idx] == quote {
|
||||
|
|
@ -645,7 +648,8 @@ fn extract_literal_values(input: &str, allow_bare: bool) -> Vec<String> {
|
|||
}
|
||||
idx += 1;
|
||||
}
|
||||
values.push(input[start..idx].to_string());
|
||||
let end = idx.min(bytes.len());
|
||||
values.push(input[start..end].to_string());
|
||||
}
|
||||
b'[' | b'(' | b'{' => {
|
||||
let (close, start) = match bytes[idx] {
|
||||
|
|
@ -663,7 +667,9 @@ fn extract_literal_values(input: &str, allow_bare: bool) -> Vec<String> {
|
|||
idx += 1;
|
||||
while idx < bytes.len() {
|
||||
if bytes[idx] == b'\\' && quote != b'`' {
|
||||
idx += 2;
|
||||
// Clamp so a trailing backslash inside a
|
||||
// bracketed string can't escape past EOF.
|
||||
idx = (idx + 2).min(bytes.len());
|
||||
continue;
|
||||
}
|
||||
if bytes[idx] == quote {
|
||||
|
|
@ -1061,6 +1067,28 @@ mod tests {
|
|||
assert_eq!(vals, vec![r#""he said \"hi\"""#]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extract_literals_trailing_backslash_does_not_panic() {
|
||||
// Regression: a string literal ending in a lone backslash used to
|
||||
// push idx past bytes.len() and panic at `input[start..idx]`.
|
||||
// The unterminated literal should be returned as-is (without the
|
||||
// closing quote that doesn't exist) and contain the trailing
|
||||
// backslash.
|
||||
let single = extract_literal_values("'foo \\", false);
|
||||
assert_eq!(single, vec!["'foo \\"]);
|
||||
|
||||
let double = extract_literal_values("\"foo \\", false);
|
||||
assert_eq!(double, vec!["\"foo \\"]);
|
||||
|
||||
// The bracketed form should also recurse into the unterminated
|
||||
// string without panicking; the result may be empty if no inner
|
||||
// values were closed, but the call must return.
|
||||
let bracketed = extract_literal_values("['foo \\']", false);
|
||||
// Only assert non-panic; exact shape depends on bracket matching
|
||||
// around an unterminated string.
|
||||
let _ = bracketed;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extract_literals_numbers() {
|
||||
let vals = extract_literal_values("42, -3.14, +1", false);
|
||||
|
|
|
|||
|
|
@ -8,7 +8,11 @@ impl DetailsReporter {
|
|||
) -> Result<()> {
|
||||
let envelope = self.build_report_envelope(args)?;
|
||||
if !envelope.findings.is_empty() || envelope.access_map.is_some() {
|
||||
serde_json::to_writer_pretty(&mut writer, &envelope)?;
|
||||
// Compact one-envelope-per-line so streaming emits (parallel
|
||||
// scan path: one envelope per repo) concatenate into valid
|
||||
// JSONL that `kingfisher view` can parse. Pipe through `jq .`
|
||||
// for human-readable pretty output.
|
||||
serde_json::to_writer(&mut writer, &envelope)?;
|
||||
writeln!(writer)?;
|
||||
}
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -113,8 +113,13 @@ where
|
|||
ProgressBar::hidden()
|
||||
};
|
||||
|
||||
let (ready_tx, ready_rx) = crossbeam_channel::unbounded();
|
||||
let clone_concurrency = std::cmp::max(1, args.num_jobs);
|
||||
// Bound this internal channel so cloner workers block on `send` when the
|
||||
// single-threaded dispatcher loop below can't forward fast enough (e.g.
|
||||
// because the outer bounded scan channel is full). Without this bound,
|
||||
// cloners would race ahead and fill `/tmp` with completed-but-unscanned
|
||||
// clones, which is exactly the disk-overflow bug we're trying to fix.
|
||||
let (ready_tx, ready_rx) = crossbeam_channel::bounded(std::cmp::max(2, clone_concurrency * 2));
|
||||
let ignore_certs = global_args.ignore_certs;
|
||||
|
||||
ThreadPoolBuilder::new()
|
||||
|
|
@ -813,6 +818,10 @@ pub async fn fetch_teams_messages(
|
|||
Ok(vec![output_dir])
|
||||
}
|
||||
|
||||
/// Streams per-repo artifact directories (issues/PRs/wikis) into `out_tx`
|
||||
/// as soon as each one is fetched, rather than collecting all results before
|
||||
/// returning. This lets the scan loop start consuming artifact dirs while
|
||||
/// remote fetches for other repos are still in flight.
|
||||
pub async fn fetch_git_host_artifacts(
|
||||
repo_urls: &[GitUrl],
|
||||
bitbucket_api_url: &Url,
|
||||
|
|
@ -820,67 +829,76 @@ pub async fn fetch_git_host_artifacts(
|
|||
bitbucket_host: Option<String>,
|
||||
global_args: &global::GlobalArgs,
|
||||
datastore: &Arc<Mutex<findings_store::FindingsStore>>,
|
||||
) -> Result<Vec<PathBuf>> {
|
||||
concurrency: usize,
|
||||
out_tx: crossbeam_channel::Sender<PathBuf>,
|
||||
) -> Result<()> {
|
||||
use futures::stream::{self, StreamExt};
|
||||
|
||||
let output_root = {
|
||||
let ds = datastore.lock().unwrap();
|
||||
ds.clone_root()
|
||||
};
|
||||
let mut dirs = Vec::new();
|
||||
for repo_url in repo_urls {
|
||||
let host = Url::parse(repo_url.as_str())
|
||||
.ok()
|
||||
.and_then(|u| u.host_str().map(|s| s.to_string()))
|
||||
.unwrap_or_default();
|
||||
if host.contains("github") {
|
||||
dirs.extend(
|
||||
github::fetch_repo_items(
|
||||
repo_url,
|
||||
global_args.ignore_certs,
|
||||
&output_root,
|
||||
datastore,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
} else if host.contains("gitlab") {
|
||||
dirs.extend(
|
||||
gitlab::fetch_repo_items(
|
||||
repo_url,
|
||||
global_args.ignore_certs,
|
||||
&output_root,
|
||||
datastore,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
} else if host.contains("bitbucket")
|
||||
|| bitbucket_host
|
||||
.as_deref()
|
||||
.map(|expected| expected.eq_ignore_ascii_case(&host))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
dirs.extend(
|
||||
bitbucket::fetch_repo_items(
|
||||
repo_url,
|
||||
bitbucket_api_url,
|
||||
bitbucket_auth,
|
||||
global_args.ignore_certs,
|
||||
&output_root,
|
||||
datastore,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
} else if host.contains("dev.azure") || host.contains("visualstudio.com") {
|
||||
dirs.extend(
|
||||
azure::fetch_repo_items(
|
||||
repo_url,
|
||||
global_args.ignore_certs,
|
||||
&output_root,
|
||||
datastore,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
let concurrency = std::cmp::max(1, concurrency);
|
||||
|
||||
// Fan out per-repo artifact fetches concurrently. Each completed fetch
|
||||
// pushes its dirs into `out_tx` immediately so the scanner can pick them
|
||||
// up while the remaining fetches are still running.
|
||||
let mut stream = stream::iter(repo_urls.iter().cloned())
|
||||
.map(|repo_url| {
|
||||
let bitbucket_api_url = bitbucket_api_url.clone();
|
||||
let bitbucket_auth = bitbucket_auth.clone();
|
||||
let bitbucket_host = bitbucket_host.clone();
|
||||
let output_root = output_root.clone();
|
||||
let datastore = Arc::clone(datastore);
|
||||
let ignore_certs = global_args.ignore_certs;
|
||||
async move {
|
||||
let host = Url::parse(repo_url.as_str())
|
||||
.ok()
|
||||
.and_then(|u| u.host_str().map(|s| s.to_string()))
|
||||
.unwrap_or_default();
|
||||
if host.contains("github") {
|
||||
github::fetch_repo_items(&repo_url, ignore_certs, &output_root, &datastore)
|
||||
.await
|
||||
} else if host.contains("gitlab") {
|
||||
gitlab::fetch_repo_items(&repo_url, ignore_certs, &output_root, &datastore)
|
||||
.await
|
||||
} else if host.contains("bitbucket")
|
||||
|| bitbucket_host
|
||||
.as_deref()
|
||||
.map(|expected| expected.eq_ignore_ascii_case(&host))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
bitbucket::fetch_repo_items(
|
||||
&repo_url,
|
||||
&bitbucket_api_url,
|
||||
&bitbucket_auth,
|
||||
ignore_certs,
|
||||
&output_root,
|
||||
&datastore,
|
||||
)
|
||||
.await
|
||||
} else if host.contains("dev.azure") || host.contains("visualstudio.com") {
|
||||
azure::fetch_repo_items(&repo_url, ignore_certs, &output_root, &datastore).await
|
||||
} else {
|
||||
Ok(Vec::new())
|
||||
}
|
||||
}
|
||||
})
|
||||
.buffer_unordered(concurrency);
|
||||
|
||||
while let Some(result) = stream.next().await {
|
||||
let dirs = result?;
|
||||
for d in dirs {
|
||||
// Send may block if the bounded channel is full; that's the
|
||||
// intended backpressure. Errors only occur if the receiver has
|
||||
// been dropped (scan aborted), in which case we stop producing.
|
||||
if out_tx.send(d).is_err() {
|
||||
debug!("scan channel closed; stopping git-host artifact fetcher");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(dirs)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn fetch_s3_objects(
|
||||
|
|
|
|||
|
|
@ -63,10 +63,18 @@ pub async fn run_scan(
|
|||
rules_db: &RulesDatabase,
|
||||
datastore: Arc<Mutex<FindingsStore>>,
|
||||
update_status: &crate::update::UpdateStatus,
|
||||
auto_cleanup_clones: bool,
|
||||
) -> Result<()> {
|
||||
run_async_scan(global_args, scan_args, Arc::clone(&datastore), rules_db, update_status)
|
||||
.await
|
||||
.context("Failed to run scan command")
|
||||
run_async_scan(
|
||||
global_args,
|
||||
scan_args,
|
||||
Arc::clone(&datastore),
|
||||
rules_db,
|
||||
update_status,
|
||||
auto_cleanup_clones,
|
||||
)
|
||||
.await
|
||||
.context("Failed to run scan command")
|
||||
}
|
||||
|
||||
pub async fn run_async_scan(
|
||||
|
|
@ -75,6 +83,7 @@ pub async fn run_async_scan(
|
|||
datastore: Arc<Mutex<findings_store::FindingsStore>>,
|
||||
rules_db: &RulesDatabase,
|
||||
update_status: &crate::update::UpdateStatus,
|
||||
auto_cleanup_clones: bool,
|
||||
) -> Result<()> {
|
||||
// ── Phase 1: Input validation and environment setup ──────────────────
|
||||
validate_inputs(args)?;
|
||||
|
|
@ -93,20 +102,37 @@ pub async fn run_async_scan(
|
|||
let repo_urls = enumerate_all_repos(args, global_args).await?;
|
||||
|
||||
let mut input_roots = args.input_specifier_args.path_inputs.clone();
|
||||
let (repo_tx, repo_rx) = crossbeam_channel::unbounded();
|
||||
let repo_clone_handle =
|
||||
start_repo_cloning(&repo_urls, args, global_args, &datastore, repo_tx, progress_enabled);
|
||||
// Bound the channel feeding the scan loop. Both the cloner pool and the
|
||||
// artifact-fetching task push into this channel; bounding it caps how
|
||||
// many cloned-but-unscanned repos sit on disk while the scanner catches
|
||||
// up. Combined with the inner cloner→dispatcher channel (also
|
||||
// 2*num_jobs) and the per-repo cleanup after scan, the worst-case
|
||||
// on-disk count is roughly 6*num_jobs (inner queue + outer queue +
|
||||
// active cloners + active scans), i.e. O(num_jobs).
|
||||
let scan_channel_cap = std::cmp::max(2, args.num_jobs * 2);
|
||||
let (repo_tx, repo_rx) = crossbeam_channel::bounded(scan_channel_cap);
|
||||
|
||||
// ── Phase 3: Artifact fetching ──────────────────────────────────────
|
||||
fetch_all_artifacts(
|
||||
// ── Phase 3: Spawn cloning + artifact-fetching concurrently ─────────
|
||||
// The scan loop will start consuming from `repo_rx` as soon as we get
|
||||
// there in Phase 5; both producers feed it as their work completes.
|
||||
let repo_clone_handle = start_repo_cloning(
|
||||
&repo_urls,
|
||||
args,
|
||||
global_args,
|
||||
&datastore,
|
||||
repo_tx.clone(),
|
||||
progress_enabled,
|
||||
);
|
||||
let artifact_handle = start_artifact_fetching(
|
||||
args,
|
||||
global_args,
|
||||
&repo_urls,
|
||||
&datastore,
|
||||
&mut input_roots,
|
||||
repo_tx.clone(),
|
||||
progress_enabled,
|
||||
)
|
||||
.await?;
|
||||
);
|
||||
// Drop the local sender so the channel closes once all producers finish.
|
||||
drop(repo_tx);
|
||||
|
||||
// ── Phase 4: Scan configuration ─────────────────────────────────────
|
||||
let shared_profiler = Arc::new(ConcurrentRuleProfiler::new());
|
||||
|
|
@ -138,7 +164,15 @@ pub async fn run_async_scan(
|
|||
|
||||
let has_remote_objects = args.input_specifier_args.s3_bucket.is_some()
|
||||
|| args.input_specifier_args.gcs_bucket.is_some();
|
||||
if input_roots.is_empty() && repo_urls.is_empty() && !has_remote_objects {
|
||||
// The artifact task pushes into `repo_rx` asynchronously, so we can't
|
||||
// observe its work via `input_roots`. Defer to the type to know which
|
||||
// flags schedule artifact fetching so this stays in sync as new sources
|
||||
// are added.
|
||||
if input_roots.is_empty()
|
||||
&& repo_urls.is_empty()
|
||||
&& !has_remote_objects
|
||||
&& !args.input_specifier_args.has_artifact_sources()
|
||||
{
|
||||
bail!("No inputs to scan");
|
||||
}
|
||||
|
||||
|
|
@ -190,6 +224,7 @@ pub async fn run_async_scan(
|
|||
&mut input_roots,
|
||||
repo_rx,
|
||||
repo_clone_handle,
|
||||
artifact_handle,
|
||||
&shared_profiler,
|
||||
enable_profiling,
|
||||
&matcher_stats,
|
||||
|
|
@ -200,6 +235,7 @@ pub async fn run_async_scan(
|
|||
start_time,
|
||||
scan_started_at,
|
||||
update_status,
|
||||
auto_cleanup_clones,
|
||||
)
|
||||
.await?;
|
||||
return Ok(());
|
||||
|
|
@ -213,6 +249,7 @@ pub async fn run_async_scan(
|
|||
&repo_roots,
|
||||
repo_rx,
|
||||
repo_clone_handle,
|
||||
artifact_handle,
|
||||
&shared_profiler,
|
||||
enable_profiling,
|
||||
&matcher_stats,
|
||||
|
|
@ -223,6 +260,7 @@ pub async fn run_async_scan(
|
|||
start_time,
|
||||
scan_started_at,
|
||||
update_status,
|
||||
auto_cleanup_clones,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -338,53 +376,123 @@ fn start_repo_cloning(
|
|||
Some(handle)
|
||||
}
|
||||
|
||||
/// Fetches artifacts from various platforms (issues, wikis, Jira, Confluence, Slack, Docker).
|
||||
/// Spawns a dedicated thread (with its own multi-threaded tokio runtime)
|
||||
/// that streams artifact directories into `out_tx` as each fetch completes.
|
||||
/// Decoupling from the parent runtime ensures the artifact task can make
|
||||
/// progress regardless of how the parent runtime is configured (including
|
||||
/// `#[tokio::test]`'s default single-threaded runtime), while the scan
|
||||
/// loops on the parent thread block on sync `repo_rx.iter()`.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the OS refuses to spawn the worker thread (e.g. resource
|
||||
/// exhaustion). This is treated as unrecoverable on the main scan path
|
||||
/// because every other concurrent component would face the same limit.
|
||||
fn start_artifact_fetching(
|
||||
args: &scan::ScanArgs,
|
||||
global_args: &global::GlobalArgs,
|
||||
repo_urls: &[crate::git_url::GitUrl],
|
||||
datastore: &Arc<Mutex<FindingsStore>>,
|
||||
out_tx: crossbeam_channel::Sender<PathBuf>,
|
||||
progress_enabled: bool,
|
||||
) -> std::thread::JoinHandle<Result<()>> {
|
||||
let args = args.clone();
|
||||
let global_args = global_args.clone();
|
||||
let repo_urls = repo_urls.to_vec();
|
||||
let datastore = Arc::clone(datastore);
|
||||
std::thread::Builder::new()
|
||||
.name("artifact-fetcher".to_string())
|
||||
.spawn(move || -> Result<()> {
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(args.num_jobs.max(1))
|
||||
.enable_all()
|
||||
.build()
|
||||
.context("Failed to build artifact-fetcher runtime")?;
|
||||
rt.block_on(fetch_all_artifacts(
|
||||
&args,
|
||||
&global_args,
|
||||
&repo_urls,
|
||||
&datastore,
|
||||
out_tx,
|
||||
progress_enabled,
|
||||
))
|
||||
})
|
||||
.expect("failed to spawn artifact-fetcher thread")
|
||||
}
|
||||
|
||||
/// Fetches artifacts from various platforms (issues, wikis, Jira, Confluence,
|
||||
/// Slack, Docker) and streams each produced directory into `out_tx` as soon
|
||||
/// as it is ready, so the scan loop can process them concurrently with
|
||||
/// further fetches and with cloning. Returns when all sources are exhausted
|
||||
/// or when the receiver has been dropped (scan aborted).
|
||||
async fn fetch_all_artifacts(
|
||||
args: &scan::ScanArgs,
|
||||
global_args: &global::GlobalArgs,
|
||||
repo_urls: &[crate::git_url::GitUrl],
|
||||
datastore: &Arc<Mutex<FindingsStore>>,
|
||||
input_roots: &mut Vec<PathBuf>,
|
||||
out_tx: crossbeam_channel::Sender<PathBuf>,
|
||||
progress_enabled: bool,
|
||||
) -> Result<()> {
|
||||
let bitbucket_auth = bitbucket::AuthConfig::from_env();
|
||||
let bitbucket_host =
|
||||
args.input_specifier_args.bitbucket_api_url.host_str().map(|s| s.to_string());
|
||||
|
||||
let push = |dir: PathBuf, tx: &crossbeam_channel::Sender<PathBuf>| -> bool {
|
||||
// send blocks on bounded channel (intended backpressure); errors
|
||||
// only happen if all receivers have been dropped (scan aborted).
|
||||
match tx.send(dir) {
|
||||
Ok(()) => true,
|
||||
Err(_) => {
|
||||
debug!("scan channel closed; stopping artifact fetcher");
|
||||
false
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if args.input_specifier_args.repo_artifacts {
|
||||
let repo_artifact_dirs = fetch_git_host_artifacts(
|
||||
fetch_git_host_artifacts(
|
||||
repo_urls,
|
||||
&args.input_specifier_args.bitbucket_api_url,
|
||||
&bitbucket_auth,
|
||||
bitbucket_host.clone(),
|
||||
global_args,
|
||||
datastore,
|
||||
args.num_jobs,
|
||||
out_tx.clone(),
|
||||
)
|
||||
.await?;
|
||||
input_roots.extend(repo_artifact_dirs);
|
||||
}
|
||||
|
||||
// Fetch Jira issues if requested
|
||||
let jira_dirs = fetch_jira_issues(args, global_args, datastore).await?;
|
||||
input_roots.extend(jira_dirs);
|
||||
for d in fetch_jira_issues(args, global_args, datastore).await? {
|
||||
if !push(d, &out_tx) {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch Confluence pages if requested
|
||||
let confluence_dirs = fetch_confluence_pages(args, global_args, datastore).await?;
|
||||
input_roots.extend(confluence_dirs);
|
||||
for d in fetch_confluence_pages(args, global_args, datastore).await? {
|
||||
if !push(d, &out_tx) {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch Slack messages if requested
|
||||
let slack_dirs = fetch_slack_messages(args, global_args, datastore).await?;
|
||||
input_roots.extend(slack_dirs);
|
||||
for d in fetch_slack_messages(args, global_args, datastore).await? {
|
||||
if !push(d, &out_tx) {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch Teams messages if requested
|
||||
let teams_dirs = fetch_teams_messages(args, global_args, datastore).await?;
|
||||
input_roots.extend(teams_dirs);
|
||||
for d in fetch_teams_messages(args, global_args, datastore).await? {
|
||||
if !push(d, &out_tx) {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch Postman resources if requested
|
||||
let postman_dirs = fetch_postman_resources(args, global_args, datastore).await?;
|
||||
input_roots.extend(postman_dirs);
|
||||
for d in fetch_postman_resources(args, global_args, datastore).await? {
|
||||
if !push(d, &out_tx) {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Save Docker images if specified
|
||||
if !args.input_specifier_args.docker_image.is_empty() {
|
||||
let clone_root = {
|
||||
let ds = datastore.lock().unwrap();
|
||||
|
|
@ -401,7 +509,9 @@ async fn fetch_all_artifacts(
|
|||
let mut ds = datastore.lock().unwrap();
|
||||
ds.register_docker_image(dir.clone(), img);
|
||||
}
|
||||
input_roots.push(dir);
|
||||
if !push(dir, &out_tx) {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -560,6 +670,7 @@ async fn run_sequential_scan(
|
|||
input_roots: &mut Vec<PathBuf>,
|
||||
repo_rx: crossbeam_channel::Receiver<PathBuf>,
|
||||
repo_clone_handle: Option<std::thread::JoinHandle<()>>,
|
||||
artifact_handle: std::thread::JoinHandle<Result<()>>,
|
||||
shared_profiler: &Arc<ConcurrentRuleProfiler>,
|
||||
enable_profiling: bool,
|
||||
matcher_stats: &Arc<Mutex<MatcherStats>>,
|
||||
|
|
@ -570,39 +681,60 @@ async fn run_sequential_scan(
|
|||
start_time: Instant,
|
||||
scan_started_at: chrono::DateTime<chrono::Local>,
|
||||
update_status: &crate::update::UpdateStatus,
|
||||
auto_cleanup_clones: bool,
|
||||
) -> Result<()> {
|
||||
let mut streamed_roots = Vec::new();
|
||||
if !input_roots.is_empty() {
|
||||
let _inputs = enumerate_filesystem_inputs(
|
||||
args,
|
||||
datastore.clone(),
|
||||
input_roots,
|
||||
progress_enabled,
|
||||
rules_db,
|
||||
enable_profiling,
|
||||
Arc::clone(shared_profiler),
|
||||
matcher_stats.as_ref(),
|
||||
)?;
|
||||
}
|
||||
// Run the scan loop in a closure so that, even if a per-repo
|
||||
// `enumerate_filesystem_inputs` returns Err and short-circuits via `?`,
|
||||
// we still drop `repo_rx` and join the cloning + artifact-fetching
|
||||
// threads before returning. Without this, the producer threads would
|
||||
// continue cloning into `/tmp` after the scan has already failed.
|
||||
let scan_result: Result<()> = (|| {
|
||||
if !input_roots.is_empty() {
|
||||
let _inputs = enumerate_filesystem_inputs(
|
||||
args,
|
||||
datastore.clone(),
|
||||
input_roots,
|
||||
progress_enabled,
|
||||
rules_db,
|
||||
enable_profiling,
|
||||
Arc::clone(shared_profiler),
|
||||
matcher_stats.as_ref(),
|
||||
)?;
|
||||
}
|
||||
|
||||
for repo_root in repo_rx.iter() {
|
||||
enumerate_filesystem_inputs(
|
||||
args,
|
||||
datastore.clone(),
|
||||
&[repo_root.clone()],
|
||||
progress_enabled,
|
||||
rules_db,
|
||||
enable_profiling,
|
||||
Arc::clone(shared_profiler),
|
||||
matcher_stats.as_ref(),
|
||||
)?;
|
||||
streamed_roots.push(repo_root);
|
||||
}
|
||||
for repo_root in repo_rx.iter() {
|
||||
enumerate_filesystem_inputs(
|
||||
args,
|
||||
datastore.clone(),
|
||||
&[repo_root.clone()],
|
||||
progress_enabled,
|
||||
rules_db,
|
||||
enable_profiling,
|
||||
Arc::clone(shared_profiler),
|
||||
matcher_stats.as_ref(),
|
||||
)?;
|
||||
if auto_cleanup_clones && let Err(e) = fs::remove_dir_all(&repo_root) {
|
||||
debug!("Failed to remove scanned clone {}: {e}", repo_root.display());
|
||||
}
|
||||
streamed_roots.push(repo_root);
|
||||
}
|
||||
Ok(())
|
||||
})();
|
||||
input_roots.extend(streamed_roots);
|
||||
|
||||
if let Some(handle) = repo_clone_handle {
|
||||
let _ = handle.join();
|
||||
}
|
||||
let artifact_result = match artifact_handle.join() {
|
||||
Ok(r) => r,
|
||||
Err(_) => Err(anyhow::anyhow!("artifact fetch thread panicked")),
|
||||
};
|
||||
|
||||
// Surface the scan error first; if scanning succeeded, surface any
|
||||
// artifact-fetching error.
|
||||
scan_result?;
|
||||
artifact_result.map_err(|e| e.context("artifact fetching failed"))?;
|
||||
|
||||
deduplicate_new_matches(datastore, global_args, args, 0)?;
|
||||
apply_baseline_if_configured(args, datastore, baseline_path.as_ref(), input_roots)?;
|
||||
|
|
@ -655,6 +787,7 @@ async fn run_parallel_scan(
|
|||
repo_roots: &[PathBuf],
|
||||
repo_rx: crossbeam_channel::Receiver<PathBuf>,
|
||||
repo_clone_handle: Option<std::thread::JoinHandle<()>>,
|
||||
artifact_handle: std::thread::JoinHandle<Result<()>>,
|
||||
shared_profiler: &Arc<ConcurrentRuleProfiler>,
|
||||
enable_profiling: bool,
|
||||
matcher_stats: &Arc<Mutex<MatcherStats>>,
|
||||
|
|
@ -665,6 +798,7 @@ async fn run_parallel_scan(
|
|||
start_time: Instant,
|
||||
scan_started_at: chrono::DateTime<chrono::Local>,
|
||||
update_status: &crate::update::UpdateStatus,
|
||||
auto_cleanup_clones: bool,
|
||||
) -> Result<()> {
|
||||
deduplicate_new_matches(datastore, global_args, args, 0)?;
|
||||
apply_baseline_if_configured(args, datastore, baseline_path.as_ref(), repo_roots)?;
|
||||
|
|
@ -709,7 +843,15 @@ async fn run_parallel_scan(
|
|||
.build()
|
||||
.context("Failed to build repo scan thread pool")?
|
||||
.scope(|scope| {
|
||||
let spawn_repo_scan = |root: PathBuf| {
|
||||
// Distinguishes user-supplied `repo_roots` (must be preserved)
|
||||
// from clones / artifact dirs that arrive via `repo_rx` and
|
||||
// are eligible for post-scan cleanup.
|
||||
#[derive(Clone, Copy)]
|
||||
enum ScanRootSource {
|
||||
UserPath,
|
||||
Streamed,
|
||||
}
|
||||
let spawn_repo_scan = |root: PathBuf, source: ScanRootSource| {
|
||||
let repo_rules = repo_rules.clone();
|
||||
let base_clone_root = base_clone_root.clone();
|
||||
let baseline_path = Arc::clone(baseline_path);
|
||||
|
|
@ -815,21 +957,34 @@ async fn run_parallel_scan(
|
|||
error!("Repository scan failed: {e}");
|
||||
repo_errors.lock().unwrap().push(e);
|
||||
}
|
||||
|
||||
if matches!(source, ScanRootSource::Streamed)
|
||||
&& auto_cleanup_clones
|
||||
&& let Err(e) = fs::remove_dir_all(&root)
|
||||
{
|
||||
debug!("Failed to remove scanned clone {}: {e}", root.display());
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
for root in repo_roots.iter().cloned() {
|
||||
spawn_repo_scan(root);
|
||||
spawn_repo_scan(root, ScanRootSource::UserPath);
|
||||
}
|
||||
|
||||
for root in repo_rx.iter() {
|
||||
spawn_repo_scan(root);
|
||||
spawn_repo_scan(root, ScanRootSource::Streamed);
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(handle) = repo_clone_handle {
|
||||
let _ = handle.join();
|
||||
}
|
||||
// Surface artifact-fetching errors after all per-repo scans have finished.
|
||||
match artifact_handle.join() {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(e)) => return Err(e.context("artifact fetching failed")),
|
||||
Err(_) => return Err(anyhow::anyhow!("artifact fetch thread panicked")),
|
||||
}
|
||||
|
||||
if let Some(err) = repo_errors.lock().unwrap().pop() {
|
||||
return Err(err);
|
||||
|
|
|
|||
|
|
@ -221,6 +221,7 @@ fn run_skiplist(skip_regex: Vec<String>, skip_skipword: Vec<String>) -> Result<u
|
|||
Arc::clone(&datastore),
|
||||
&rules_db,
|
||||
&update_status,
|
||||
false,
|
||||
))?;
|
||||
|
||||
let x = Ok(datastore.lock().unwrap().get_matches().len());
|
||||
|
|
|
|||
|
|
@ -200,7 +200,8 @@ fn test_bitbucket_remote_scan() -> Result<()> {
|
|||
let update_status = UpdateStatus::default();
|
||||
|
||||
runtime.block_on(async {
|
||||
run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status).await
|
||||
run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status, false)
|
||||
.await
|
||||
})?;
|
||||
|
||||
let ds = datastore.lock().unwrap();
|
||||
|
|
|
|||
|
|
@ -231,6 +231,7 @@ rules:
|
|||
Arc::clone(&datastore),
|
||||
&rules_db,
|
||||
&update_status,
|
||||
false,
|
||||
))?;
|
||||
|
||||
let x = Ok(datastore.lock().unwrap().get_matches().len());
|
||||
|
|
|
|||
|
|
@ -209,7 +209,8 @@ fn test_github_remote_scan() -> Result<()> {
|
|||
let update_status = UpdateStatus::default();
|
||||
// Run the scan using runtime.block_on
|
||||
runtime.block_on(async {
|
||||
run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status).await
|
||||
run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status, false)
|
||||
.await
|
||||
})?;
|
||||
// Get scan results
|
||||
let ds = datastore.lock().unwrap();
|
||||
|
|
|
|||
|
|
@ -206,7 +206,8 @@ fn test_gitlab_remote_scan() -> Result<()> {
|
|||
let update_status = UpdateStatus::default();
|
||||
|
||||
rt.block_on(async {
|
||||
run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status).await
|
||||
run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status, false)
|
||||
.await
|
||||
})?;
|
||||
|
||||
let ds = datastore.lock().unwrap();
|
||||
|
|
@ -390,7 +391,8 @@ fn test_gitlab_remote_scan_no_history() -> Result<()> {
|
|||
let update_status = UpdateStatus::default();
|
||||
|
||||
rt.block_on(async {
|
||||
run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status).await
|
||||
run_scan(&global_args, &scan_args, &rules_db, Arc::clone(&datastore), &update_status, false)
|
||||
.await
|
||||
})?;
|
||||
|
||||
let ds = datastore.lock().unwrap();
|
||||
|
|
|
|||
|
|
@ -277,8 +277,15 @@ async fn test_scan_postman_all() -> Result<()> {
|
|||
let datastore = Arc::new(Mutex::new(FindingsStore::new(clone_dir)));
|
||||
let update_status = UpdateStatus::default();
|
||||
|
||||
run_async_scan(&global_args, &scan_args, Arc::clone(&datastore), &rules_db, &update_status)
|
||||
.await?;
|
||||
run_async_scan(
|
||||
&global_args,
|
||||
&scan_args,
|
||||
Arc::clone(&datastore),
|
||||
&rules_db,
|
||||
&update_status,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let ds = datastore.lock().unwrap();
|
||||
let findings = ds.get_matches().len();
|
||||
|
|
|
|||
|
|
@ -182,8 +182,15 @@ async fn test_redact_hashes_finding_values() -> Result<()> {
|
|||
let update_status = UpdateStatus::default();
|
||||
|
||||
let datastore = Arc::new(Mutex::new(FindingsStore::new(temp_dir.path().to_path_buf())));
|
||||
run_async_scan(&global_args, &scan_args, Arc::clone(&datastore), &rules_db, &update_status)
|
||||
.await?;
|
||||
run_async_scan(
|
||||
&global_args,
|
||||
&scan_args,
|
||||
Arc::clone(&datastore),
|
||||
&rules_db,
|
||||
&update_status,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let ds = datastore.lock().unwrap();
|
||||
let matches = ds.get_matches();
|
||||
|
|
|
|||
|
|
@ -352,8 +352,15 @@ async fn test_scan_slack_messages() -> Result<()> {
|
|||
let datastore = Arc::new(Mutex::new(FindingsStore::new(clone_dir)));
|
||||
let update_status = UpdateStatus::default();
|
||||
|
||||
run_async_scan(&global_args, &scan_args, Arc::clone(&datastore), &ctx.rules_db, &update_status)
|
||||
.await?;
|
||||
run_async_scan(
|
||||
&global_args,
|
||||
&scan_args,
|
||||
Arc::clone(&datastore),
|
||||
&ctx.rules_db,
|
||||
&update_status,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let findings = {
|
||||
let ds = datastore.lock().unwrap();
|
||||
|
|
|
|||
|
|
@ -224,8 +224,15 @@ async fn test_scan_teams_messages() -> Result<()> {
|
|||
let datastore = Arc::new(Mutex::new(FindingsStore::new(clone_dir)));
|
||||
let update_status = UpdateStatus::default();
|
||||
|
||||
run_async_scan(&global_args, &scan_args, Arc::clone(&datastore), &rules_db, &update_status)
|
||||
.await?;
|
||||
run_async_scan(
|
||||
&global_args,
|
||||
&scan_args,
|
||||
Arc::clone(&datastore),
|
||||
&rules_db,
|
||||
&update_status,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let findings = {
|
||||
let ds = datastore.lock().unwrap();
|
||||
|
|
|
|||
|
|
@ -275,8 +275,15 @@ async fn test_validation_cache_and_depvars() -> Result<()> {
|
|||
};
|
||||
let update_status = UpdateStatus::default();
|
||||
|
||||
run_async_scan(&global_args, &scan_args, Arc::clone(&datastore), &rules_db, &update_status)
|
||||
.await?;
|
||||
run_async_scan(
|
||||
&global_args,
|
||||
&scan_args,
|
||||
Arc::clone(&datastore),
|
||||
&rules_db,
|
||||
&update_status,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
|
||||
/* --------------------------------------------------------- *
|
||||
* 6. Assertions *
|
||||
|
|
|
|||
|
|
@ -362,6 +362,7 @@ impl TestContext {
|
|||
Arc::clone(&datastore),
|
||||
&self.rules_db,
|
||||
&update_status,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue