diff --git a/crates/kingfisher-rules/data/rules/airbrake.yml b/crates/kingfisher-rules/data/rules/airbrake.yml index e755027..e94c488 100644 --- a/crates/kingfisher-rules/data/rules/airbrake.yml +++ b/crates/kingfisher-rules/data/rules/airbrake.yml @@ -33,9 +33,7 @@ rules: - status: - 200 type: StatusMatch - - words: - - '"id"' - type: WordMatch + - type: JsonValid - words: - '"type":"Unauthorized"' type: WordMatch diff --git a/src/reporter.rs b/src/reporter.rs index e4b0623..b38a45e 100644 --- a/src/reporter.rs +++ b/src/reporter.rs @@ -506,6 +506,22 @@ pub fn run( ds: Arc>, args: &cli::commands::scan::ScanArgs, audit_context: Option, +) -> Result<()> { + let writer = args.output_args.get_writer()?; + run_with_writer(global_args, ds, args, audit_context, writer) +} + +/// Same as [`run`], but writes into a caller-provided `Write` instead of +/// constructing one from `args.output_args`. Useful when the caller wants +/// to render into an in-memory buffer first (e.g. so a stdout lock can be +/// held only around the final atomic emit, not around the report's CPU +/// work). +pub fn run_with_writer( + global_args: &GlobalArgs, + ds: Arc>, + args: &cli::commands::scan::ScanArgs, + audit_context: Option, + writer: W, ) -> Result<()> { global_args.use_color(std::io::stdout()); let stdout_is_tty = std::io::stdout().is_terminal(); @@ -513,11 +529,8 @@ pub fn run( let styles = Styles::new(use_color); let ds_clone = Arc::clone(&ds); - // Initialize the reporter let reporter = DetailsReporter { datastore: ds_clone, styles, only_valid: args.only_valid, audit_context }; - let writer = args.output_args.get_writer()?; - // Generate and write the report in the specified format reporter.report(args.output_args.format, writer, args) } pub struct DetailsReporter { diff --git a/src/reporter/json_format.rs b/src/reporter/json_format.rs index 8843fdc..c796383 100644 --- a/src/reporter/json_format.rs +++ b/src/reporter/json_format.rs @@ -12,8 +12,22 @@ impl DetailsReporter { // scan path: one envelope per repo) concatenate into valid // JSONL that `kingfisher view` can parse. Pipe through `jq .` // for human-readable pretty output. - serde_json::to_writer(&mut writer, &envelope)?; - writeln!(writer)?; + // + // Serialize into a single buffer and emit via a single + // `write_all` so callers that need cross-thread atomicity + // (e.g. the parallel scan path emitting one envelope per repo + // to stdout) can synchronize at the call site by holding + // `std::io::stdout().lock()` around this call. We intentionally + // do NOT acquire the stdout lock here because this method is + // generic over any `Write` and is also called with file + // writers and `Cursor>` in tests. Flushing is the + // caller's responsibility — flushing here would defeat + // upstream `BufWriter` buffering and turn an otherwise-benign + // BrokenPipe into a hard error. + let mut buf = Vec::with_capacity(8 * 1024); + serde_json::to_writer(&mut buf, &envelope)?; + buf.push(b'\n'); + writer.write_all(&buf)?; } Ok(()) } diff --git a/src/scanner/runner.rs b/src/scanner/runner.rs index c9a034b..384e0d0 100644 --- a/src/scanner/runner.rs +++ b/src/scanner/runner.rs @@ -941,13 +941,42 @@ async fn run_parallel_scan( } if !output_to_file { - crate::reporter::run( + // Per-repo emit goes to stdout from many rayon + // threads in parallel. Render the report into + // an in-memory buffer first (CPU work, no + // contention), then take the stdout lock only + // around the final atomic write+flush so two + // threads' envelopes can't interleave and + // corrupt JSONL output. + let mut buf: Vec = Vec::with_capacity(8 * 1024); + crate::reporter::run_with_writer( global_args, Arc::clone(&repo_datastore), &args, None, + &mut buf, ) .context("Failed to run report command")?; + if !buf.is_empty() { + use std::io::Write; + let mut stdout = std::io::stdout().lock(); + // Treat a closed downstream pipe (e.g. + // `kingfisher scan ... | head`) as a normal + // early exit, matching `summary.rs::safe_println!`. + // Any other I/O error is a real failure. + if let Err(err) = stdout.write_all(&buf) { + if err.kind() == std::io::ErrorKind::BrokenPipe { + std::process::exit(0); + } + return Err(err.into()); + } + if let Err(err) = stdout.flush() { + if err.kind() == std::io::ErrorKind::BrokenPipe { + std::process::exit(0); + } + return Err(err.into()); + } + } } {