From 2c08659563e82f57f99f9fe7a4213572321ea7bb Mon Sep 17 00:00:00 2001 From: Mick Grove Date: Thu, 30 Apr 2026 00:32:49 -0700 Subject: [PATCH 1/4] copilot fixes --- .../kingfisher-rules/data/rules/airbrake.yml | 3 --- src/reporter/json_format.rs | 20 +++++++++++++++++-- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/crates/kingfisher-rules/data/rules/airbrake.yml b/crates/kingfisher-rules/data/rules/airbrake.yml index e755027..8275640 100644 --- a/crates/kingfisher-rules/data/rules/airbrake.yml +++ b/crates/kingfisher-rules/data/rules/airbrake.yml @@ -33,9 +33,6 @@ rules: - status: - 200 type: StatusMatch - - words: - - '"id"' - type: WordMatch - words: - '"type":"Unauthorized"' type: WordMatch diff --git a/src/reporter/json_format.rs b/src/reporter/json_format.rs index 8843fdc..2c60f41 100644 --- a/src/reporter/json_format.rs +++ b/src/reporter/json_format.rs @@ -12,8 +12,24 @@ impl DetailsReporter { // scan path: one envelope per repo) concatenate into valid // JSONL that `kingfisher view` can parse. Pipe through `jq .` // for human-readable pretty output. - serde_json::to_writer(&mut writer, &envelope)?; - writeln!(writer)?; + // + // Serialize into a single buffer and write it atomically while + // holding stdout's reentrant lock. The parallel scan path + // emits one envelope per repo from many rayon threads, each + // with its own `BufWriter`. Without this lock, the + // multiple `write()` calls produced by `serde_json::to_writer` + // and the eventual BufWriter flush can interleave across + // threads and corrupt JSONL lines. Stdout's lock is reentrant + // so internal `Stdout::write` calls during the flush don't + // deadlock with the explicit lock acquired here. For non-stdout + // writers (e.g. file output) the stdout lock is harmless extra + // contention. + let mut buf = Vec::with_capacity(8 * 1024); + serde_json::to_writer(&mut buf, &envelope)?; + buf.push(b'\n'); + let _stdout_lock = std::io::stdout().lock(); + writer.write_all(&buf)?; + writer.flush()?; } Ok(()) } From 06f72ec9f0c3b6d4679d70bbde2456f787f11e96 Mon Sep 17 00:00:00 2001 From: Mick Grove Date: Thu, 30 Apr 2026 08:38:14 -0700 Subject: [PATCH 2/4] copilot fixes --- src/reporter/json_format.rs | 20 ++++++++------------ src/scanner/runner.rs | 8 ++++++++ 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/reporter/json_format.rs b/src/reporter/json_format.rs index 2c60f41..40cb600 100644 --- a/src/reporter/json_format.rs +++ b/src/reporter/json_format.rs @@ -13,21 +13,17 @@ impl DetailsReporter { // JSONL that `kingfisher view` can parse. Pipe through `jq .` // for human-readable pretty output. // - // Serialize into a single buffer and write it atomically while - // holding stdout's reentrant lock. The parallel scan path - // emits one envelope per repo from many rayon threads, each - // with its own `BufWriter`. Without this lock, the - // multiple `write()` calls produced by `serde_json::to_writer` - // and the eventual BufWriter flush can interleave across - // threads and corrupt JSONL lines. Stdout's lock is reentrant - // so internal `Stdout::write` calls during the flush don't - // deadlock with the explicit lock acquired here. For non-stdout - // writers (e.g. file output) the stdout lock is harmless extra - // contention. + // Serialize into a single buffer and emit via a single + // `write_all` + `flush` so callers that need cross-thread + // atomicity (e.g. the parallel scan path emitting one envelope + // per repo to stdout) can synchronize at the call site by + // holding `std::io::stdout().lock()` around this call. We + // intentionally do NOT acquire the stdout lock here because + // this method is generic over any `Write` and is also called + // with file writers and `Cursor>` in tests. let mut buf = Vec::with_capacity(8 * 1024); serde_json::to_writer(&mut buf, &envelope)?; buf.push(b'\n'); - let _stdout_lock = std::io::stdout().lock(); writer.write_all(&buf)?; writer.flush()?; } diff --git a/src/scanner/runner.rs b/src/scanner/runner.rs index c9a034b..5ae9b2c 100644 --- a/src/scanner/runner.rs +++ b/src/scanner/runner.rs @@ -941,6 +941,14 @@ async fn run_parallel_scan( } if !output_to_file { + // Per-repo emit goes to stdout from many rayon + // threads in parallel. Hold stdout's reentrant + // lock for the duration of `reporter::run` so + // the report's writes (and the eventual + // `BufWriter::flush` on drop) can't + // interleave with another thread's report, + // which would otherwise corrupt JSONL output. + let _stdout_lock = std::io::stdout().lock(); crate::reporter::run( global_args, Arc::clone(&repo_datastore), From b7b6dfdeb2bff974ad32b041045a00217d2143b3 Mon Sep 17 00:00:00 2001 From: Mick Grove Date: Thu, 30 Apr 2026 09:02:49 -0700 Subject: [PATCH 3/4] copilot fixes --- src/reporter.rs | 19 ++++++++++++++++--- src/scanner/runner.rs | 23 +++++++++++++++-------- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/src/reporter.rs b/src/reporter.rs index e4b0623..b38a45e 100644 --- a/src/reporter.rs +++ b/src/reporter.rs @@ -506,6 +506,22 @@ pub fn run( ds: Arc>, args: &cli::commands::scan::ScanArgs, audit_context: Option, +) -> Result<()> { + let writer = args.output_args.get_writer()?; + run_with_writer(global_args, ds, args, audit_context, writer) +} + +/// Same as [`run`], but writes into a caller-provided `Write` instead of +/// constructing one from `args.output_args`. Useful when the caller wants +/// to render into an in-memory buffer first (e.g. so a stdout lock can be +/// held only around the final atomic emit, not around the report's CPU +/// work). +pub fn run_with_writer( + global_args: &GlobalArgs, + ds: Arc>, + args: &cli::commands::scan::ScanArgs, + audit_context: Option, + writer: W, ) -> Result<()> { global_args.use_color(std::io::stdout()); let stdout_is_tty = std::io::stdout().is_terminal(); @@ -513,11 +529,8 @@ pub fn run( let styles = Styles::new(use_color); let ds_clone = Arc::clone(&ds); - // Initialize the reporter let reporter = DetailsReporter { datastore: ds_clone, styles, only_valid: args.only_valid, audit_context }; - let writer = args.output_args.get_writer()?; - // Generate and write the report in the specified format reporter.report(args.output_args.format, writer, args) } pub struct DetailsReporter { diff --git a/src/scanner/runner.rs b/src/scanner/runner.rs index 5ae9b2c..fbfe8d1 100644 --- a/src/scanner/runner.rs +++ b/src/scanner/runner.rs @@ -942,20 +942,27 @@ async fn run_parallel_scan( if !output_to_file { // Per-repo emit goes to stdout from many rayon - // threads in parallel. Hold stdout's reentrant - // lock for the duration of `reporter::run` so - // the report's writes (and the eventual - // `BufWriter::flush` on drop) can't - // interleave with another thread's report, - // which would otherwise corrupt JSONL output. - let _stdout_lock = std::io::stdout().lock(); - crate::reporter::run( + // threads in parallel. Render the report into + // an in-memory buffer first (CPU work, no + // contention), then take the stdout lock only + // around the final atomic write+flush so two + // threads' envelopes can't interleave and + // corrupt JSONL output. + let mut buf: Vec = Vec::with_capacity(8 * 1024); + crate::reporter::run_with_writer( global_args, Arc::clone(&repo_datastore), &args, None, + &mut buf, ) .context("Failed to run report command")?; + if !buf.is_empty() { + use std::io::Write; + let mut stdout = std::io::stdout().lock(); + stdout.write_all(&buf)?; + stdout.flush()?; + } } { From 90737f098ce0b387e4ce8ecf12c08ad1247dc590 Mon Sep 17 00:00:00 2001 From: Mick Grove Date: Thu, 30 Apr 2026 09:29:23 -0700 Subject: [PATCH 4/4] copilot fixes --- .../kingfisher-rules/data/rules/airbrake.yml | 1 + src/reporter/json_format.rs | 18 ++++++++++-------- src/scanner/runner.rs | 18 ++++++++++++++++-- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/crates/kingfisher-rules/data/rules/airbrake.yml b/crates/kingfisher-rules/data/rules/airbrake.yml index 8275640..e94c488 100644 --- a/crates/kingfisher-rules/data/rules/airbrake.yml +++ b/crates/kingfisher-rules/data/rules/airbrake.yml @@ -33,6 +33,7 @@ rules: - status: - 200 type: StatusMatch + - type: JsonValid - words: - '"type":"Unauthorized"' type: WordMatch diff --git a/src/reporter/json_format.rs b/src/reporter/json_format.rs index 40cb600..c796383 100644 --- a/src/reporter/json_format.rs +++ b/src/reporter/json_format.rs @@ -14,18 +14,20 @@ impl DetailsReporter { // for human-readable pretty output. // // Serialize into a single buffer and emit via a single - // `write_all` + `flush` so callers that need cross-thread - // atomicity (e.g. the parallel scan path emitting one envelope - // per repo to stdout) can synchronize at the call site by - // holding `std::io::stdout().lock()` around this call. We - // intentionally do NOT acquire the stdout lock here because - // this method is generic over any `Write` and is also called - // with file writers and `Cursor>` in tests. + // `write_all` so callers that need cross-thread atomicity + // (e.g. the parallel scan path emitting one envelope per repo + // to stdout) can synchronize at the call site by holding + // `std::io::stdout().lock()` around this call. We intentionally + // do NOT acquire the stdout lock here because this method is + // generic over any `Write` and is also called with file + // writers and `Cursor>` in tests. Flushing is the + // caller's responsibility — flushing here would defeat + // upstream `BufWriter` buffering and turn an otherwise-benign + // BrokenPipe into a hard error. let mut buf = Vec::with_capacity(8 * 1024); serde_json::to_writer(&mut buf, &envelope)?; buf.push(b'\n'); writer.write_all(&buf)?; - writer.flush()?; } Ok(()) } diff --git a/src/scanner/runner.rs b/src/scanner/runner.rs index fbfe8d1..384e0d0 100644 --- a/src/scanner/runner.rs +++ b/src/scanner/runner.rs @@ -960,8 +960,22 @@ async fn run_parallel_scan( if !buf.is_empty() { use std::io::Write; let mut stdout = std::io::stdout().lock(); - stdout.write_all(&buf)?; - stdout.flush()?; + // Treat a closed downstream pipe (e.g. + // `kingfisher scan ... | head`) as a normal + // early exit, matching `summary.rs::safe_println!`. + // Any other I/O error is a real failure. + if let Err(err) = stdout.write_all(&buf) { + if err.kind() == std::io::ErrorKind::BrokenPipe { + std::process::exit(0); + } + return Err(err.into()); + } + if let Err(err) = stdout.flush() { + if err.kind() == std::io::ErrorKind::BrokenPipe { + std::process::exit(0); + } + return Err(err.into()); + } } }