From f71b9d826dfcd1832627c85f33183891a0d4a6b4 Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Thu, 21 May 2026 21:15:41 -0400 Subject: [PATCH 1/8] fix(jwt): unify jsonwebtoken crypto backend --- Cargo.lock | 13 +++---------- Cargo.toml | 3 ++- tests/int_jwt_provider.rs | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 39 insertions(+), 11 deletions(-) create mode 100644 tests/int_jwt_provider.rs diff --git a/Cargo.lock b/Cargo.lock index 85f94fc..bba96ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -377,7 +377,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ec2f1fc3ec205783a5da9a7e6c1509cc69dedf09a1949e412c1e18469326d00" dependencies = [ "aws-lc-sys", - "untrusted 0.7.1", "zeroize", ] @@ -4930,7 +4929,6 @@ version = "10.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eba32bfb4ffdeaca3e34431072faf01745c9b26d25504aa7a6cf5684334fc4fc" dependencies = [ - "aws-lc-rs", "base64", "ed25519-dalek", "getrandom 0.2.17", @@ -5020,6 +5018,7 @@ dependencies = [ "indenter", "indicatif", "ipnet", + "jsonwebtoken", "kingfisher-core", "kingfisher-rules", "kingfisher-scanner", @@ -7160,7 +7159,7 @@ dependencies = [ "cfg-if", "getrandom 0.2.17", "libc", - "untrusted 0.9.0", + "untrusted", "windows-sys 0.52.0", ] @@ -7355,7 +7354,7 @@ dependencies = [ "aws-lc-rs", "ring", "rustls-pki-types", - "untrusted 0.9.0", + "untrusted", ] [[package]] @@ -9051,12 +9050,6 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" -[[package]] -name = "untrusted" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" - [[package]] name = "untrusted" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index e6876a9..e8d0def 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -220,7 +220,7 @@ aws-sdk-ssm = { version = "1.102.0", default-features = false, features = ["defa gcloud-storage = { version = "1.1.1", default-features = false, features = [ "rustls-tls", "auth", - "jwt-aws-lc-rs", + "jwt-rust-crypto", ] } tokei = "14.0.0" crc32fast = "1.5.0" @@ -249,6 +249,7 @@ testcontainers = "0.27.2" predicates = "3.1.3" assert_cmd = "2.1.1" proptest = "1.9.0" +jsonwebtoken = { version = "10.4.0", default-features = false, features = ["rust_crypto"] } [profile.release] debug = false diff --git a/tests/int_jwt_provider.rs b/tests/int_jwt_provider.rs new file mode 100644 index 0000000..704ebe9 --- /dev/null +++ b/tests/int_jwt_provider.rs @@ -0,0 +1,34 @@ +use jsonwebtoken::DecodingKey; +use kingfisher_scanner::validation::jwt::{ValidateOptions, validate_jwt_with}; + +const RS256_TOKEN: &str = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJtb2NrLXN1YmplY3QiLCJuYmYiOjAsImV4cCI6NDEwMjQ0NDgwMH0.T87uqt_EI9ISXFmfn2hVTJa-sDTF2xWjNl0Fo6ZClM3_bvdyEB5BWzkIjDmQGbXjP1iVGHv59esuoHjeRYR_S7cBBIM-J2ZWuR_FfVSwjI-jxDlQGw8BFBN6qqpX2dBQfe0NmJ4GzBmQmyPX9GVNlw6zZvW0SGnaX5GcD7HOCqoZQhkiI4W1zTCQ_J4OjJnMwdNg6XkquwBj_yV-VKx_9NYXXTCjl6JtFBF9ZP2X3I58sLSOTzbkTSwSHfLpWLxWfzEYItwHALsK_fBAYMlSZwRvHpRBc48Tqg_2hjOi8j2qQiMbPDTNJJDnt1jEz0JeYahH8N7aJzIPEmd2HXFdKw"; + +const RSA_PUBLIC_KEY_PEM: &str = r#"-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2OcytZklidtKr63saWAt +CnwQmMS8W7OEpbnrP746SSR/gkkrNYBkW3POX3T9dcaf4Ozn50QuFGUqBdCAvHUS +9ZFjubPXsqaxOY9R1eiQt8V+0mf1yI7Q9KCygbqZvilyJ6//kvWTKWA5N9A48J69 +wkkxuDXnhmSK0zwuNOetphuQNtVuCvePrvrI9OkcYp8EC2qtJi6oxy+0dI9lCN5+ +qQyxWDAJVtPw1I/xSZFzMdFrpZWA65VcqKVqjCEB4bHAc15S7UCuLEgBFlqQEndk +6qTKCy0cVm7LqMOLuNJzbhzNU5caXbEYu6uzzU4vLgIdWpIr09dpNxFl+oA0zbMa +vQIDAQAB +-----END PUBLIC KEY-----"#; + +#[tokio::test] +async fn validate_jwt_with_fallback_key_handles_rs256_without_panicking() { + let opts = ValidateOptions { + allow_alg_none: false, + fallback_decoding_key: Some( + DecodingKey::from_rsa_pem(RSA_PUBLIC_KEY_PEM.as_bytes()).expect("valid RSA key"), + ), + }; + + let (ok, message) = validate_jwt_with(RS256_TOKEN, &opts, false, false) + .await + .expect("RS256 validation should not panic or error"); + + assert!(ok, "expected JWT signature verification to succeed: {message}"); + assert!( + message.contains("JWT valid via fallback key"), + "unexpected validation message: {message}" + ); +} From ebd8acfc1ba3f6c44946ebf2341a416338ac0c0d Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Thu, 21 May 2026 21:56:01 -0400 Subject: [PATCH 2/8] test(jwt): generate ephemeral RSA keypair in RS256 regression test - Replace the inline RS256 token and committed public key with a throwaway RSA keypair generated at runtime; the token is signed from readable claims so no key material or opaque blobs live in the repo - Add `rsa` as a dev-only dependency (getrandom feature) for in-test key generation; release binary is unaffected Addresses review feedback on #386. --- Cargo.lock | 1 + Cargo.toml | 3 +++ tests/int_jwt_provider.rs | 50 ++++++++++++++++++++++++++------------- 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bba96ba..2327f89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5045,6 +5045,7 @@ dependencies = [ "regex", "reqwest 0.12.28", "roaring", + "rsa", "rusqlite", "rustc-hash", "rustls", diff --git a/Cargo.toml b/Cargo.toml index e8d0def..fca12db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -250,6 +250,9 @@ predicates = "3.1.3" assert_cmd = "2.1.1" proptest = "1.9.0" jsonwebtoken = { version = "10.4.0", default-features = false, features = ["rust_crypto"] } +# Test-only: generate an ephemeral RSA keypair for the RS256 JWT regression test. +# `getrandom` enables `rsa::rand_core::OsRng`; `pem` is on by default for PEM export. +rsa = { version = "0.9.10", features = ["getrandom"] } [profile.release] debug = false diff --git a/tests/int_jwt_provider.rs b/tests/int_jwt_provider.rs index 704ebe9..1e4a3d2 100644 --- a/tests/int_jwt_provider.rs +++ b/tests/int_jwt_provider.rs @@ -1,34 +1,52 @@ -use jsonwebtoken::DecodingKey; +use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, encode}; use kingfisher_scanner::validation::jwt::{ValidateOptions, validate_jwt_with}; +use rsa::RsaPrivateKey; +use rsa::pkcs1::{EncodeRsaPrivateKey, LineEnding}; +use rsa::pkcs8::EncodePublicKey; +use rsa::rand_core::OsRng; -const RS256_TOKEN: &str = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJtb2NrLXN1YmplY3QiLCJuYmYiOjAsImV4cCI6NDEwMjQ0NDgwMH0.T87uqt_EI9ISXFmfn2hVTJa-sDTF2xWjNl0Fo6ZClM3_bvdyEB5BWzkIjDmQGbXjP1iVGHv59esuoHjeRYR_S7cBBIM-J2ZWuR_FfVSwjI-jxDlQGw8BFBN6qqpX2dBQfe0NmJ4GzBmQmyPX9GVNlw6zZvW0SGnaX5GcD7HOCqoZQhkiI4W1zTCQ_J4OjJnMwdNg6XkquwBj_yV-VKx_9NYXXTCjl6JtFBF9ZP2X3I58sLSOTzbkTSwSHfLpWLxWfzEYItwHALsK_fBAYMlSZwRvHpRBc48Tqg_2hjOi8j2qQiMbPDTNJJDnt1jEz0JeYahH8N7aJzIPEmd2HXFdKw"; - -const RSA_PUBLIC_KEY_PEM: &str = r#"-----BEGIN PUBLIC KEY----- -MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2OcytZklidtKr63saWAt -CnwQmMS8W7OEpbnrP746SSR/gkkrNYBkW3POX3T9dcaf4Ozn50QuFGUqBdCAvHUS -9ZFjubPXsqaxOY9R1eiQt8V+0mf1yI7Q9KCygbqZvilyJ6//kvWTKWA5N9A48J69 -wkkxuDXnhmSK0zwuNOetphuQNtVuCvePrvrI9OkcYp8EC2qtJi6oxy+0dI9lCN5+ -qQyxWDAJVtPw1I/xSZFzMdFrpZWA65VcqKVqjCEB4bHAc15S7UCuLEgBFlqQEndk -6qTKCy0cVm7LqMOLuNJzbhzNU5caXbEYu6uzzU4vLgIdWpIr09dpNxFl+oA0zbMa -vQIDAQAB ------END PUBLIC KEY-----"#; - +/// Regression test for the `jsonwebtoken` CryptoProvider panic (see issue #385). +/// +/// It exercises the asymmetric (RS256) verification path through `validate_jwt_with` +/// via the fallback decoding key. A throwaway RSA keypair is generated at runtime and +/// the token is signed from readable claims, so no opaque token blobs or key material +/// are committed to the repository. #[tokio::test] async fn validate_jwt_with_fallback_key_handles_rs256_without_panicking() { + // Generate an ephemeral RSA keypair for this test run only. + let mut rng = OsRng; + let private_key = RsaPrivateKey::new(&mut rng, 2048).expect("generate RSA key"); + let private_pem = private_key.to_pkcs1_pem(LineEnding::LF).expect("encode private key"); + let public_pem = + private_key.to_public_key().to_public_key_pem(LineEnding::LF).expect("encode public key"); + + // Omitting `iss` routes validation through the fallback-key path (the one that panicked). + let claims = serde_json::json!({ + "sub": "mock-subject", + "nbf": 0, + "exp": 4_102_444_800_i64, // year 2100, so the token never expires during CI + }); + let token = encode( + &Header::new(Algorithm::RS256), + &claims, + &EncodingKey::from_rsa_pem(private_pem.as_bytes()).expect("valid encoding key"), + ) + .expect("sign RS256 token"); + let opts = ValidateOptions { allow_alg_none: false, fallback_decoding_key: Some( - DecodingKey::from_rsa_pem(RSA_PUBLIC_KEY_PEM.as_bytes()).expect("valid RSA key"), + DecodingKey::from_rsa_pem(public_pem.as_bytes()).expect("valid RSA key"), ), }; - let (ok, message) = validate_jwt_with(RS256_TOKEN, &opts, false, false) + let (ok, message) = validate_jwt_with(&token, &opts, false, false) .await .expect("RS256 validation should not panic or error"); assert!(ok, "expected JWT signature verification to succeed: {message}"); assert!( message.contains("JWT valid via fallback key"), - "unexpected validation message: {message}" + "expected the fallback-key verification path: {message}" ); } From d2e4e2f7376d947b061344d3b16761ee2d10b549 Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Thu, 21 May 2026 21:44:38 -0400 Subject: [PATCH 3/8] fix(validation): contain validator panics --- src/scanner/validation.rs | 191 +++++++++++++++++++++++++++++++++----- 1 file changed, 167 insertions(+), 24 deletions(-) diff --git a/src/scanner/validation.rs b/src/scanner/validation.rs index fbdb7d2..11b4abe 100644 --- a/src/scanner/validation.rs +++ b/src/scanner/validation.rs @@ -1,4 +1,6 @@ use std::{ + future::Future, + panic::AssertUnwindSafe, sync::{ Arc, Mutex, atomic::{AtomicUsize, Ordering}, @@ -901,25 +903,44 @@ async fn validate_single( // Perform validation let outcome = timeout( validation_timeout, - validate_single_match( - om, - parser, - clients, - dep_vars, - missing_deps, - cache2, - validation_timeout, - validation_retries, - rate_limiter, - provider_endpoints.as_ref(), - max_body_len, - ) - .boxed(), + catch_validation_panic( + validate_single_match( + om, + parser, + clients, + dep_vars, + missing_deps, + cache2, + validation_timeout, + validation_retries, + rate_limiter, + provider_endpoints.as_ref(), + max_body_len, + ) + .boxed(), + ), ) .await; - // Store result in cache + apply_validation_outcome(om, &cache_key, outcome, success_count, fail_count, cache); + maybe_record_access_map(om, access_map); + // Remove from `in_progress` + // in_progress.remove(&cache_key); + in_progress.remove(&cache_key); + if let Some(n) = NOTIFY.remove(&cache_key) { + n.1.notify_waiters(); // wake everyone + } +} + +fn apply_validation_outcome( + om: &mut OwnedBlobMatch, + cache_key: &str, + outcome: std::result::Result, tokio::time::error::Elapsed>, + success_count: &AtomicUsize, + fail_count: &AtomicUsize, + cache: &DashMap, +) { match outcome { - Ok(_) => { + Ok(Ok(())) => { if om.validation_success && is_counted_validation_status(om.validation_response_status) { success_count.fetch_add(1, Ordering::Relaxed); @@ -927,7 +948,26 @@ async fn validate_single( fail_count.fetch_add(1, Ordering::Relaxed); } cache.insert( - cache_key.clone(), + cache_key.to_owned(), + CachedResponse { + is_valid: om.validation_success, + status: om.validation_response_status, + body: om.validation_response_body.clone(), + timestamp: Instant::now(), + }, + ); + } + Ok(Err(panic_message)) => { + om.validation_success = false; + om.validation_response_body = validation_body::from_string(format!( + "Validation panicked for rule {}: {}", + om.rule.id(), + panic_message + )); + om.validation_response_status = http::StatusCode::INTERNAL_SERVER_ERROR; + fail_count.fetch_add(1, Ordering::Relaxed); + cache.insert( + cache_key.to_owned(), CachedResponse { is_valid: om.validation_success, status: om.validation_response_status, @@ -943,19 +983,32 @@ async fn validate_single( fail_count.fetch_add(1, Ordering::Relaxed); } } - maybe_record_access_map(om, access_map); - // Remove from `in_progress` - // in_progress.remove(&cache_key); - in_progress.remove(&cache_key); - if let Some(n) = NOTIFY.remove(&cache_key) { - n.1.notify_waiters(); // wake everyone - } } fn is_counted_validation_status(status: StatusCode) -> bool { !matches!(status, StatusCode::CONTINUE | StatusCode::PRECONDITION_REQUIRED) } +async fn catch_validation_panic(future: F) -> std::result::Result<(), String> +where + F: Future, +{ + match AssertUnwindSafe(future).catch_unwind().await { + Ok(()) => Ok(()), + Err(payload) => Err(describe_panic_payload(payload)), + } +} + +fn describe_panic_payload(payload: Box) -> String { + if let Some(message) = payload.downcast_ref::<&str>() { + (*message).to_string() + } else if let Some(message) = payload.downcast_ref::() { + message.clone() + } else { + "non-string panic payload".to_string() + } +} + // Helper to compute the cache key for an OwnedBlobMatch. fn build_cache_key(om: &OwnedBlobMatch) -> String { let capture0 = om.captures.captures.get(0).map_or(String::new(), |c| c.raw_value().to_string()); @@ -1553,6 +1606,53 @@ fn extract_azure_devops_org_from_body( #[cfg(test)] mod tests { use super::*; + use crate::{ + blob::BlobId, + matcher::{OwnedBlobMatch, SerializableCapture, SerializableCaptures}, + rules::rule::{Confidence, Rule, RuleSyntax}, + util::intern, + }; + use smallvec::smallvec; + use std::sync::Arc; + + fn make_owned_blob_match() -> OwnedBlobMatch { + OwnedBlobMatch { + rule: Arc::new(Rule::new(RuleSyntax { + name: "panic-test".to_string(), + id: "test.panic".to_string(), + pattern: "panic".to_string(), + min_entropy: 0.0, + confidence: Confidence::Low, + visible: true, + examples: vec![], + negative_examples: vec![], + references: vec![], + validation: None, + revocation: None, + depends_on_rule: vec![], + pattern_requirements: None, + tls_mode: None, + })), + blob_id: BlobId::new(b"panic-test-blob"), + finding_fingerprint: 1, + matching_input_offset_span: OffsetSpan { start: 0, end: 5 }, + captures: SerializableCaptures { + captures: smallvec![SerializableCapture { + name: None, + match_number: 0, + start: 0, + end: 5, + value: intern("panic"), + }], + }, + validation_response_body: None, + validation_response_status: StatusCode::CONTINUE, + validation_success: false, + calculated_entropy: 0.0, + is_base64: false, + dependent_captures: std::collections::BTreeMap::new(), + } + } #[test] fn counted_validation_status_excludes_skipped_statuses() { @@ -1604,4 +1704,47 @@ mod tests { other => panic!("unexpected request: {other:?}"), } } + + #[tokio::test] + async fn catch_validation_panic_returns_panic_message() { + let result = catch_validation_panic(async { + panic!("validator blew up"); + }) + .await; + + assert_eq!(result.unwrap_err(), "validator blew up"); + } + + #[tokio::test] + async fn panic_outcome_is_reported_as_failure_and_cached() { + let mut om = make_owned_blob_match(); + let cache_key = build_cache_key(&om); + let cache = DashMap::new(); + let success_count = AtomicUsize::new(0); + let fail_count = AtomicUsize::new(0); + + let outcome = Ok(catch_validation_panic(async { + panic!("validator blew up"); + }) + .await); + + apply_validation_outcome(&mut om, &cache_key, outcome, &success_count, &fail_count, &cache); + + assert!(!om.validation_success); + assert_eq!(om.validation_response_status, StatusCode::INTERNAL_SERVER_ERROR); + assert!( + validation_body::clone_as_string(&om.validation_response_body) + .contains("Validation panicked for rule test.panic: validator blew up") + ); + assert_eq!(success_count.load(Ordering::Relaxed), 0); + assert_eq!(fail_count.load(Ordering::Relaxed), 1); + + let cached = cache.get(&cache_key).expect("panic result should be cached"); + assert!(!cached.is_valid); + assert_eq!(cached.status, StatusCode::INTERNAL_SERVER_ERROR); + assert!( + validation_body::clone_as_string(&cached.body) + .contains("Validation panicked for rule test.panic: validator blew up") + ); + } } From fd13f268f03915f900205bcb147392eddbcaf254 Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Thu, 21 May 2026 23:02:57 -0400 Subject: [PATCH 4/8] fix(validation): redact panic payloads and clarify panic handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review feedback on the validator panic-containment change: - Keep raw panic payloads out of the cached and user-visible `validation_response_body`, since a panic message can embed secret material (e.g. a token captured in a debug string). The visible body now reports only the stable rule id, and the detailed payload is emitted via truncated structured logging. - Replace the nested `Result, Elapsed>` with a self-describing `ValidationOutcome` enum (`Completed` / `Panicked` / `TimedOut`) so call sites and signatures read clearly. - Document why the `AssertUnwindSafe` panic boundary is sound: the recovery path deterministically resets the match's validation fields, and the shared counters/cache are only mutated after the boundary returns, so an unwind cannot leave them inconsistent. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/scanner/validation.rs | 142 ++++++++++++++++++++++++++++---------- 1 file changed, 104 insertions(+), 38 deletions(-) diff --git a/src/scanner/validation.rs b/src/scanner/validation.rs index 11b4abe..6ab7363 100644 --- a/src/scanner/validation.rs +++ b/src/scanner/validation.rs @@ -17,7 +17,7 @@ use liquid::Parser; use reqwest::StatusCode; use rustc_hash::{FxHashMap, FxHashSet}; use tokio::{sync::Notify, time::timeout}; -use tracing::trace; +use tracing::{trace, warn}; use crate::{ access_map::AccessMapRequest, @@ -901,26 +901,28 @@ async fn validate_single( } // If we reach here, we're the first task to validate this key // Perform validation - let outcome = timeout( - validation_timeout, - catch_validation_panic( - validate_single_match( - om, - parser, - clients, - dep_vars, - missing_deps, - cache2, - validation_timeout, - validation_retries, - rate_limiter, - provider_endpoints.as_ref(), - max_body_len, - ) - .boxed(), - ), - ) - .await; + let outcome = ValidationOutcome::from_timeout_result( + timeout( + validation_timeout, + catch_validation_panic( + validate_single_match( + om, + parser, + clients, + dep_vars, + missing_deps, + cache2, + validation_timeout, + validation_retries, + rate_limiter, + provider_endpoints.as_ref(), + max_body_len, + ) + .boxed(), + ), + ) + .await, + ); apply_validation_outcome(om, &cache_key, outcome, success_count, fail_count, cache); maybe_record_access_map(om, access_map); // Remove from `in_progress` @@ -931,16 +933,43 @@ async fn validate_single( } } +/// Result of attempting to validate a single match. +/// +/// Flattens the `timeout(catch_validation_panic(..))` nesting into one +/// self-describing enum so call sites and signatures stay readable. +enum ValidationOutcome { + /// Validation ran to completion; the match's own fields describe whether it + /// succeeded or failed. + Completed, + /// Validation panicked. The payload is captured for logging only and must + /// never be surfaced to the user or cache (it may embed secret material). + Panicked(String), + /// Validation exceeded the configured timeout. + TimedOut, +} + +impl ValidationOutcome { + fn from_timeout_result( + result: std::result::Result, tokio::time::error::Elapsed>, + ) -> Self { + match result { + Ok(Ok(())) => ValidationOutcome::Completed, + Ok(Err(panic_message)) => ValidationOutcome::Panicked(panic_message), + Err(_) => ValidationOutcome::TimedOut, + } + } +} + fn apply_validation_outcome( om: &mut OwnedBlobMatch, cache_key: &str, - outcome: std::result::Result, tokio::time::error::Elapsed>, + outcome: ValidationOutcome, success_count: &AtomicUsize, fail_count: &AtomicUsize, cache: &DashMap, ) { match outcome { - Ok(Ok(())) => { + ValidationOutcome::Completed => { if om.validation_success && is_counted_validation_status(om.validation_response_status) { success_count.fetch_add(1, Ordering::Relaxed); @@ -957,12 +986,20 @@ fn apply_validation_outcome( }, ); } - Ok(Err(panic_message)) => { + ValidationOutcome::Panicked(panic_message) => { + // The panic payload can embed secret material (e.g. a token captured + // in a debug string), so it must never reach the cached or + // user-visible body. Emit the detail through structured logging + // (truncated), and keep the visible body to the stable rule id. + warn!( + rule_id = %om.rule.id(), + panic = %truncate_for_log(&panic_message), + "validator panicked; marking match as failed", + ); om.validation_success = false; om.validation_response_body = validation_body::from_string(format!( - "Validation panicked for rule {}: {}", - om.rule.id(), - panic_message + "Validation panicked for rule {}", + om.rule.id() )); om.validation_response_status = http::StatusCode::INTERNAL_SERVER_ERROR; fail_count.fetch_add(1, Ordering::Relaxed); @@ -976,7 +1013,7 @@ fn apply_validation_outcome( }, ); } - Err(_) => { + ValidationOutcome::TimedOut => { om.validation_success = false; om.validation_response_body = validation_body::from_string("Validation timed out"); om.validation_response_status = http::StatusCode::REQUEST_TIMEOUT; @@ -989,6 +1026,20 @@ fn is_counted_validation_status(status: StatusCode) -> bool { !matches!(status, StatusCode::CONTINUE | StatusCode::PRECONDITION_REQUIRED) } +/// Defensive, last-resort boundary around a validator future. +/// +/// Validators perform network I/O and parse untrusted responses, so a stray +/// `panic!`/`unwrap` would otherwise tear down the entire scan. We catch the +/// unwind here and surface it as `Err(message)` so the caller can fail just the +/// one match. +/// +/// `AssertUnwindSafe` is required because the future borrows `&mut om`. It is +/// sound for this use because the unwind is never observed as a partial result: +/// on the panic path [`apply_validation_outcome`] unconditionally overwrites the +/// match's validation fields (`validation_success`, `validation_response_status`, +/// `validation_response_body`) with a deterministic failure state. The shared +/// counters and response cache are only mutated *after* this boundary returns, +/// so a panic cannot leave them inconsistent. async fn catch_validation_panic(future: F) -> std::result::Result<(), String> where F: Future, @@ -1009,6 +1060,21 @@ fn describe_panic_payload(payload: Box) -> String { } } +/// Bound a panic message before it reaches the logs. Panic payloads are +/// unbounded in length and may be influenced by scanned content, so cap them at +/// a fixed length on a UTF-8 boundary. +fn truncate_for_log(message: &str) -> String { + const MAX_LEN: usize = 256; + if message.len() <= MAX_LEN { + return message.to_string(); + } + let mut end = MAX_LEN; + while !message.is_char_boundary(end) { + end -= 1; + } + format!("{}… (truncated)", &message[..end]) +} + // Helper to compute the cache key for an OwnedBlobMatch. fn build_cache_key(om: &OwnedBlobMatch) -> String { let capture0 = om.captures.captures.get(0).map_or(String::new(), |c| c.raw_value().to_string()); @@ -1723,28 +1789,28 @@ mod tests { let success_count = AtomicUsize::new(0); let fail_count = AtomicUsize::new(0); - let outcome = Ok(catch_validation_panic(async { + let outcome = ValidationOutcome::from_timeout_result(Ok(catch_validation_panic(async { panic!("validator blew up"); }) - .await); + .await)); apply_validation_outcome(&mut om, &cache_key, outcome, &success_count, &fail_count, &cache); assert!(!om.validation_success); assert_eq!(om.validation_response_status, StatusCode::INTERNAL_SERVER_ERROR); - assert!( - validation_body::clone_as_string(&om.validation_response_body) - .contains("Validation panicked for rule test.panic: validator blew up") - ); + let body = validation_body::clone_as_string(&om.validation_response_body); + assert!(body.contains("Validation panicked for rule test.panic")); + // The raw panic payload must never leak into the user-visible body. + assert!(!body.contains("validator blew up")); assert_eq!(success_count.load(Ordering::Relaxed), 0); assert_eq!(fail_count.load(Ordering::Relaxed), 1); let cached = cache.get(&cache_key).expect("panic result should be cached"); assert!(!cached.is_valid); assert_eq!(cached.status, StatusCode::INTERNAL_SERVER_ERROR); - assert!( - validation_body::clone_as_string(&cached.body) - .contains("Validation panicked for rule test.panic: validator blew up") - ); + let cached_body = validation_body::clone_as_string(&cached.body); + assert!(cached_body.contains("Validation panicked for rule test.panic")); + // The cached body must not retain the raw panic payload either. + assert!(!cached_body.contains("validator blew up")); } } From 138eefe2b99146ef07201d3952a8276b5ec2c45e Mon Sep 17 00:00:00 2001 From: Mick Grove Date: Fri, 22 May 2026 11:50:47 -0400 Subject: [PATCH 5/8] Fixed failed to spawn thread: Os { code: 11, kind: WouldBlock } panics during validation-heavy scans. Kingfisher built two Tokio runtimes (main + artifact-fetcher) that each defaulted to 512 blocking threads, which combined with Rayon pools and per-call spawns could exceed the OS per-user thread limit (RLIMIT_NPROC, default 8000 on macOS). Both runtimes now cap their blocking pools at max(num_jobs * 8, 32), and on Unix the soft RLIMIT_NPROC is raised to the hard limit at startup so users don't need to tune ulimit -u manually. --- CHANGELOG.md | 3 +++ Cargo.lock | 3 ++- Cargo.toml | 5 ++++- src/main.rs | 31 +++++++++++++++++++++++++++++++ src/scanner/runner.rs | 6 ++++-- src/util.rs | 26 ++++++++++++++++++++++++++ 6 files changed, 70 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d81983..58eb07c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ All notable changes to this project will be documented in this file. +## [v1.101.0] +- Fixed `failed to spawn thread: Os { code: 11, kind: WouldBlock }` panics during validation-heavy scans. Kingfisher built two Tokio runtimes (main + artifact-fetcher) that each defaulted to 512 blocking threads, which combined with Rayon pools and per-call spawns could exceed the OS per-user thread limit (`RLIMIT_NPROC`, default 8000 on macOS). Both runtimes now cap their blocking pools at `min(max(num_jobs * 8, 32), 256)`, and on Unix the soft `RLIMIT_NPROC` is raised to the hard limit before Kingfisher starts its worker threads so users don't need to tune `ulimit -u` manually. + ## [v1.100.0] - Archive scanning now reaches inside Android/iOS app packages: added `apk`, `aab`, and `ipa` to the recognized ZIP-based archive formats so secrets embedded in APK/AAB/IPA contents (e.g. `classes*.dex`, `res/values/strings.xml`) are extracted and matched. - Git repository scans now extract archive blobs encountered in the object database, not just on the filesystem. Previously a `.zip`/`.jar`/`.apk`/`.tar.gz` committed to a repo was scanned as raw compressed bytes, so secrets inside it were invisible. The git enumerator fans each archive entry out as a synthetic `!` blob with the original commit metadata. Honors `--no-extract-archives` for opt-out. diff --git a/Cargo.lock b/Cargo.lock index 85f94fc..ad94549 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4960,7 +4960,7 @@ dependencies = [ [[package]] name = "kingfisher" -version = "1.100.0" +version = "1.101.0" dependencies = [ "anyhow", "asar", @@ -5023,6 +5023,7 @@ dependencies = [ "kingfisher-core", "kingfisher-rules", "kingfisher-scanner", + "libc", "liquid", "liquid-core", "lzma-rs", diff --git a/Cargo.toml b/Cargo.toml index e6876a9..ec096e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ http = "1.4" [package] name = "kingfisher" -version = "1.100.0" +version = "1.101.0" description = "MongoDB's blazingly fast and accurate secret scanning and validation tool" edition.workspace = true rust-version.workspace = true @@ -233,6 +233,9 @@ h2 = "0.4.13" version = "0.6" optional = true +[target.'cfg(unix)'.dependencies] +libc = "0.2" + [features] default = ["use-mimalloc"] use-mimalloc = [] diff --git a/src/main.rs b/src/main.rs index 4cda1d7..b89dc55 100644 --- a/src/main.rs +++ b/src/main.rs @@ -80,6 +80,7 @@ use kingfisher::{ rules_database::RulesDatabase, scanner::{load_and_record_rules, run_scan}, update::{check_for_update_async, rewrite_argv_for_reexec}, + util::tokio_blocking_threads_limit, validation::set_user_agent_suffix, }; use serde_json::json; @@ -102,6 +103,7 @@ use crate::cli::commands::{ }; fn main() -> anyhow::Result<()> { + raise_nproc_soft_limit(); color_backtrace::install(); // Run the real entry point on a thread with an explicit, larger stack so that @@ -122,6 +124,30 @@ enum AsyncMainOutcome { Reexec, } +/// Best-effort raise of the soft `RLIMIT_NPROC` (per-user thread/process cap) +/// to the current hard limit. Many users hit `pthread_create` failures +/// (`EAGAIN` / `WouldBlock`) under heavy validation because the default soft +/// limit on macOS is well below the hard limit. Failures here are intentionally +/// silent — this is a quality-of-life nudge, not a correctness requirement. +#[cfg(unix)] +fn raise_nproc_soft_limit() { + // SAFETY: getrlimit/setrlimit are async-signal-safe and take a properly + // sized `rlimit` we own. + unsafe { + let mut rl = libc::rlimit { rlim_cur: 0, rlim_max: 0 }; + if libc::getrlimit(libc::RLIMIT_NPROC, &mut rl) != 0 { + return; + } + if rl.rlim_cur < rl.rlim_max { + let new = libc::rlimit { rlim_cur: rl.rlim_max, rlim_max: rl.rlim_max }; + let _ = libc::setrlimit(libc::RLIMIT_NPROC, &new); + } + } +} + +#[cfg(not(unix))] +fn raise_nproc_soft_limit() {} + fn run() -> anyhow::Result<()> { // Rustls 0.23 requires an explicit crypto provider selection when multiple // providers are present in the dependency graph. @@ -160,8 +186,13 @@ fn run() -> anyhow::Result<()> { // Worker threads need larger stacks because async state machines (validation // pipeline) can produce large poll stack frames. 8 MiB is sufficient now that // the validators are split into separate async fns. + // Bound the blocking-thread pool. Tokio's default is 512 per runtime; the + // helper scales with --jobs but caps each runtime below that default so the + // main and artifact-fetcher runtimes cannot both grow huge blocking pools. + let max_blocking = tokio_blocking_threads_limit(num_jobs); let runtime = Builder::new_multi_thread() .worker_threads(num_jobs) + .max_blocking_threads(max_blocking) .thread_stack_size(8 * 1024 * 1024) // 8 MiB per worker .enable_all() .build() diff --git a/src/scanner/runner.rs b/src/scanner/runner.rs index 384e0d0..209ca20 100644 --- a/src/scanner/runner.rs +++ b/src/scanner/runner.rs @@ -42,7 +42,7 @@ use crate::{ run_secret_validation, save_docker_images, summary::{compute_scan_totals, print_scan_summary}, }, - util::set_redaction_enabled, + util::{set_redaction_enabled, tokio_blocking_threads_limit}, validation::CachedResponse, validation_rate_limit::ValidationRateLimiter, }; @@ -403,8 +403,10 @@ fn start_artifact_fetching( std::thread::Builder::new() .name("artifact-fetcher".to_string()) .spawn(move || -> Result<()> { + let workers = args.num_jobs.max(1); let rt = tokio::runtime::Builder::new_multi_thread() - .worker_threads(args.num_jobs.max(1)) + .worker_threads(workers) + .max_blocking_threads(tokio_blocking_threads_limit(workers)) .enable_all() .build() .context("Failed to build artifact-fetcher runtime")?; diff --git a/src/util.rs b/src/util.rs index 7c8e233..57a78bf 100644 --- a/src/util.rs +++ b/src/util.rs @@ -16,6 +16,22 @@ use rand::RngExt; static APP_SALT: LazyLock = LazyLock::new(|| generate_salt()); static REDACTION_ENABLED: AtomicBool = AtomicBool::new(false); +const MIN_TOKIO_BLOCKING_THREADS: usize = 32; +const TOKIO_BLOCKING_THREADS_PER_JOB: usize = 8; +const MAX_TOKIO_BLOCKING_THREADS: usize = 256; + +/// Per-runtime cap for Tokio's blocking thread pool. +/// +/// Tokio defaults to 512 blocking threads per runtime. Kingfisher can run the +/// main and artifact-fetcher runtimes at the same time, so keeping each runtime +/// below that default avoids runaway thread growth during validation-heavy scans. +pub fn tokio_blocking_threads_limit(num_jobs: usize) -> usize { + num_jobs + .saturating_mul(TOKIO_BLOCKING_THREADS_PER_JOB) + .max(MIN_TOKIO_BLOCKING_THREADS) + .min(MAX_TOKIO_BLOCKING_THREADS) +} + /// Interns a string once and returns a `'static` reference to it. pub fn intern(s: &str) -> &'static str { static INTERN: LazyLock> = LazyLock::new(|| DashSet::with_capacity(512)); @@ -156,6 +172,16 @@ mod tests { use super::{is_test_like_path, *}; + #[test] + fn tokio_blocking_threads_limit_scales_and_caps() { + assert_eq!(tokio_blocking_threads_limit(0), 32); + assert_eq!(tokio_blocking_threads_limit(1), 32); + assert_eq!(tokio_blocking_threads_limit(4), 32); + assert_eq!(tokio_blocking_threads_limit(8), 64); + assert_eq!(tokio_blocking_threads_limit(32), 256); + assert_eq!(tokio_blocking_threads_limit(usize::MAX), 256); + } + /// Paths that **should** be classified as test-like. #[test] fn test_is_test_like_path_positive() { From 207174e1a82f8102a031e134f883391ec244130a Mon Sep 17 00:00:00 2001 From: Mick Grove Date: Fri, 22 May 2026 12:37:37 -0400 Subject: [PATCH 6/8] merged 2 PRs and updated changelog --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 58eb07c..c71f32a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,9 @@ All notable changes to this project will be documented in this file. ## [v1.101.0] -- Fixed `failed to spawn thread: Os { code: 11, kind: WouldBlock }` panics during validation-heavy scans. Kingfisher built two Tokio runtimes (main + artifact-fetcher) that each defaulted to 512 blocking threads, which combined with Rayon pools and per-call spawns could exceed the OS per-user thread limit (`RLIMIT_NPROC`, default 8000 on macOS). Both runtimes now cap their blocking pools at `min(max(num_jobs * 8, 32), 256)`, and on Unix the soft `RLIMIT_NPROC` is raised to the hard limit before Kingfisher starts its worker threads so users don't need to tune `ulimit -u` manually. +- Fixed asymmetric JWT validation panics by using a single `jsonwebtoken` crypto backend and adding RS256 regression coverage. Thanks @AgentEnder. [#386](https://github.com/mongodb/kingfisher/pull/386) +- Validator panics now fail that validation result instead of crashing the scan, with panic payloads kept out of cached and user-visible validation responses. Thanks @AgentEnder. [#387](https://github.com/mongodb/kingfisher/pull/387) +- Reduced `failed to spawn thread` errors in validation-heavy scans by capping Tokio blocking pools for the main and artifact-fetcher runtimes and raising the Unix soft `RLIMIT_NPROC` before worker startup. ## [v1.100.0] - Archive scanning now reaches inside Android/iOS app packages: added `apk`, `aab`, and `ipa` to the recognized ZIP-based archive formats so secrets embedded in APK/AAB/IPA contents (e.g. `classes*.dex`, `res/values/strings.xml`) are extracted and matched. From bb7fea155e5990ab336598d3f24d49a4c1d22c32 Mon Sep 17 00:00:00 2001 From: Mick Grove Date: Fri, 22 May 2026 14:17:59 -0400 Subject: [PATCH 7/8] merged 2 PRs and updated changelog --- docs-site/docs/changelog.md | 4 ++++ src/scanner/validation.rs | 16 ++++++++++------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/docs-site/docs/changelog.md b/docs-site/docs/changelog.md index 42d7283..4d8af32 100644 --- a/docs-site/docs/changelog.md +++ b/docs-site/docs/changelog.md @@ -6,6 +6,10 @@ description: "Kingfisher release history: new features, rules, bug fixes, and im # Changelog All notable changes to this project will be documented in this file. +## [v1.101.0] +- Fixed asymmetric JWT validation panics by using a single `jsonwebtoken` crypto backend and adding RS256 regression coverage. Thanks @AgentEnder. [#386](https://github.com/mongodb/kingfisher/pull/386) +- Validator panics now fail that validation result instead of crashing the scan, with panic payloads kept out of cached and user-visible validation responses. Thanks @AgentEnder. [#387](https://github.com/mongodb/kingfisher/pull/387) +- Reduced `failed to spawn thread` errors in validation-heavy scans by capping Tokio blocking pools for the main and artifact-fetcher runtimes and raising the Unix soft `RLIMIT_NPROC` before worker startup. ## [v1.100.0] - Archive scanning now reaches inside Android/iOS app packages: added `apk`, `aab`, and `ipa` to the recognized ZIP-based archive formats so secrets embedded in APK/AAB/IPA contents (e.g. `classes*.dex`, `res/values/strings.xml`) are extracted and matched. diff --git a/src/scanner/validation.rs b/src/scanner/validation.rs index 6ab7363..b547fb3 100644 --- a/src/scanner/validation.rs +++ b/src/scanner/validation.rs @@ -17,7 +17,7 @@ use liquid::Parser; use reqwest::StatusCode; use rustc_hash::{FxHashMap, FxHashSet}; use tokio::{sync::Notify, time::timeout}; -use tracing::{trace, warn}; +use tracing::{debug, trace, warn}; use crate::{ access_map::AccessMapRequest, @@ -989,19 +989,23 @@ fn apply_validation_outcome( ValidationOutcome::Panicked(panic_message) => { // The panic payload can embed secret material (e.g. a token captured // in a debug string), so it must never reach the cached or - // user-visible body. Emit the detail through structured logging - // (truncated), and keep the visible body to the stable rule id. + // user-visible body. Keep WARN free of the payload too; truncated + // panic detail is only emitted at DEBUG for troubleshooting. warn!( rule_id = %om.rule.id(), - panic = %truncate_for_log(&panic_message), "validator panicked; marking match as failed", ); + debug!( + rule_id = %om.rule.id(), + panic = %truncate_for_log(&panic_message), + "validator panic detail", + ); om.validation_success = false; om.validation_response_body = validation_body::from_string(format!( "Validation panicked for rule {}", om.rule.id() )); - om.validation_response_status = http::StatusCode::INTERNAL_SERVER_ERROR; + om.validation_response_status = StatusCode::INTERNAL_SERVER_ERROR; fail_count.fetch_add(1, Ordering::Relaxed); cache.insert( cache_key.to_owned(), @@ -1016,7 +1020,7 @@ fn apply_validation_outcome( ValidationOutcome::TimedOut => { om.validation_success = false; om.validation_response_body = validation_body::from_string("Validation timed out"); - om.validation_response_status = http::StatusCode::REQUEST_TIMEOUT; + om.validation_response_status = StatusCode::REQUEST_TIMEOUT; fail_count.fetch_add(1, Ordering::Relaxed); } } From a0d2fa3611952fe379a3c5a1b055320221f6279e Mon Sep 17 00:00:00 2001 From: Mick Grove Date: Fri, 22 May 2026 13:15:59 -0700 Subject: [PATCH 8/8] merged 2 PRs and updated changelog --- src/main.rs | 6 +- src/scanner/validation.rs | 69 ++++++++++------------- src/util.rs | 3 +- src/validation.rs | 114 ++++++++++++++++++++++---------------- 4 files changed, 101 insertions(+), 91 deletions(-) diff --git a/src/main.rs b/src/main.rs index b89dc55..b7617b2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -131,8 +131,10 @@ enum AsyncMainOutcome { /// silent — this is a quality-of-life nudge, not a correctness requirement. #[cfg(unix)] fn raise_nproc_soft_limit() { - // SAFETY: getrlimit/setrlimit are async-signal-safe and take a properly - // sized `rlimit` we own. + // SAFETY: `getrlimit`/`setrlimit` are FFI calls. We pass pointers to + // properly initialized `libc::rlimit` values with the correct layout, only + // read `rl` after `getrlimit` reports success, and treat `setrlimit` + // failure as best-effort by ignoring its return value. unsafe { let mut rl = libc::rlimit { rlim_cur: 0, rlim_max: 0 }; if libc::getrlimit(libc::RLIMIT_NPROC, &mut rl) != 0 { diff --git a/src/scanner/validation.rs b/src/scanner/validation.rs index b547fb3..728c588 100644 --- a/src/scanner/validation.rs +++ b/src/scanner/validation.rs @@ -16,7 +16,7 @@ use indicatif::{ProgressBar, ProgressStyle}; use liquid::Parser; use reqwest::StatusCode; use rustc_hash::{FxHashMap, FxHashSet}; -use tokio::{sync::Notify, time::timeout}; +use tokio::sync::Notify; use tracing::{debug, trace, warn}; use crate::{ @@ -901,25 +901,22 @@ async fn validate_single( } // If we reach here, we're the first task to validate this key // Perform validation - let outcome = ValidationOutcome::from_timeout_result( - timeout( - validation_timeout, - catch_validation_panic( - validate_single_match( - om, - parser, - clients, - dep_vars, - missing_deps, - cache2, - validation_timeout, - validation_retries, - rate_limiter, - provider_endpoints.as_ref(), - max_body_len, - ) - .boxed(), - ), + let outcome = ValidationOutcome::from_panic_result( + catch_validation_panic( + validate_single_match( + om, + parser, + clients, + dep_vars, + missing_deps, + cache2, + validation_timeout, + validation_retries, + rate_limiter, + provider_endpoints.as_ref(), + max_body_len, + ) + .boxed(), ) .await, ); @@ -935,8 +932,9 @@ async fn validate_single( /// Result of attempting to validate a single match. /// -/// Flattens the `timeout(catch_validation_panic(..))` nesting into one -/// self-describing enum so call sites and signatures stay readable. +/// Flattens panic handling into a self-describing enum so call sites and +/// signatures stay readable. Validation timeouts are handled inside +/// `validate_single_match`, where the module-local de-dupe state can be cleaned. enum ValidationOutcome { /// Validation ran to completion; the match's own fields describe whether it /// succeeded or failed. @@ -944,18 +942,13 @@ enum ValidationOutcome { /// Validation panicked. The payload is captured for logging only and must /// never be surfaced to the user or cache (it may embed secret material). Panicked(String), - /// Validation exceeded the configured timeout. - TimedOut, } impl ValidationOutcome { - fn from_timeout_result( - result: std::result::Result, tokio::time::error::Elapsed>, - ) -> Self { + fn from_panic_result(result: std::result::Result<(), String>) -> Self { match result { - Ok(Ok(())) => ValidationOutcome::Completed, - Ok(Err(panic_message)) => ValidationOutcome::Panicked(panic_message), - Err(_) => ValidationOutcome::TimedOut, + Ok(()) => ValidationOutcome::Completed, + Err(panic_message) => ValidationOutcome::Panicked(panic_message), } } } @@ -1017,12 +1010,6 @@ fn apply_validation_outcome( }, ); } - ValidationOutcome::TimedOut => { - om.validation_success = false; - om.validation_response_body = validation_body::from_string("Validation timed out"); - om.validation_response_status = StatusCode::REQUEST_TIMEOUT; - fail_count.fetch_add(1, Ordering::Relaxed); - } } } @@ -1793,10 +1780,12 @@ mod tests { let success_count = AtomicUsize::new(0); let fail_count = AtomicUsize::new(0); - let outcome = ValidationOutcome::from_timeout_result(Ok(catch_validation_panic(async { - panic!("validator blew up"); - }) - .await)); + let outcome = ValidationOutcome::from_panic_result( + catch_validation_panic(async { + panic!("validator blew up"); + }) + .await, + ); apply_validation_outcome(&mut om, &cache_key, outcome, &success_count, &fail_count, &cache); diff --git a/src/util.rs b/src/util.rs index 57a78bf..9b6c1b8 100644 --- a/src/util.rs +++ b/src/util.rs @@ -28,8 +28,7 @@ const MAX_TOKIO_BLOCKING_THREADS: usize = 256; pub fn tokio_blocking_threads_limit(num_jobs: usize) -> usize { num_jobs .saturating_mul(TOKIO_BLOCKING_THREADS_PER_JOB) - .max(MIN_TOKIO_BLOCKING_THREADS) - .min(MAX_TOKIO_BLOCKING_THREADS) + .clamp(MIN_TOKIO_BLOCKING_THREADS, MAX_TOKIO_BLOCKING_THREADS) } /// Interns a string once and returns a `'static` reference to it. diff --git a/src/validation.rs b/src/validation.rs index d390f8a..f6041ea 100644 --- a/src/validation.rs +++ b/src/validation.rs @@ -2,6 +2,7 @@ use std::{ collections::BTreeMap, fs, hash::{Hash, Hasher}, + panic::AssertUnwindSafe, sync::Arc, time::{Duration, Instant}, }; @@ -17,7 +18,7 @@ use liquid_core::{Value, ValueView}; use reqwest::{Client, Url, header, header::HeaderValue, multipart}; use rustc_hash::{FxHashMap, FxHashSet}; use tokio::{sync::Notify, time}; -use tracing::{debug, trace}; +use tracing::{debug, trace, warn}; use crate::{ cli::global::TlsMode, @@ -303,6 +304,24 @@ fn validation_dedup_key(m: &OwnedBlobMatch) -> u64 { static VALIDATION_CACHE: OnceLock> = OnceLock::new(); static IN_FLIGHT: OnceLock>> = OnceLock::new(); +fn cache_validation_result(fp: u64, m: &OwnedBlobMatch) { + VALIDATION_CACHE.get_or_init(DashMap::new).insert( + fp, + CachedResponse { + body: m.validation_response_body.clone(), + status: m.validation_response_status, + is_valid: m.validation_success, + timestamp: Instant::now(), + }, + ); +} + +fn clear_in_flight_validation(fp: u64) { + if let Some((_, notify)) = IN_FLIGHT.get_or_init(DashMap::new).remove(&fp) { + notify.notify_waiters(); + } +} + /// Call this once near program start (e.g. in `main()`) pub fn init_validation_caches() { VALIDATION_CACHE.set(DashMap::new()).ok(); @@ -457,45 +476,56 @@ pub async fn validate_single_match( max_body_len: usize, ) { let fp = validation_dedup_key(m); + // Keep the unwind boundary inside this module so the process-wide + // validation de-dupe state is cleared before the caller observes a panic. + // The panic branch below overwrites the match with a deterministic failure. let timeout_result = time::timeout( validation_timeout, - timed_validate_single_match( - m, - parser, - clients, - dependent_variables, - missing_dependencies, - cache, - validation_timeout, - validation_retries, - rate_limiter, - provider_endpoints, - max_body_len, + AssertUnwindSafe( + timed_validate_single_match( + m, + parser, + clients, + dependent_variables, + missing_dependencies, + cache, + validation_timeout, + validation_retries, + rate_limiter, + provider_endpoints, + max_body_len, + ) + .boxed(), ) - .boxed(), + .catch_unwind(), ) .await; - if timeout_result.is_err() { - m.validation_success = false; - m.validation_response_body = validation_body::from_string(format!( - "Validation timed out after {} seconds", - validation_timeout.as_secs() - )); - m.validation_response_status = StatusCode::REQUEST_TIMEOUT; - - VALIDATION_CACHE.get_or_init(DashMap::new).insert( - fp, - CachedResponse { - body: m.validation_response_body.clone(), - status: m.validation_response_status, - is_valid: false, - timestamp: Instant::now(), - }, - ); - - if let Some((_, notify)) = IN_FLIGHT.get_or_init(DashMap::new).remove(&fp) { - notify.notify_waiters(); + match timeout_result { + Ok(Ok(())) => {} + Ok(Err(_panic_payload)) => { + warn!( + rule_id = %m.rule.syntax().id, + "validator panicked; marking match as failed", + ); + m.validation_success = false; + m.validation_response_body = validation_body::from_string(format!( + "Validation panicked for rule {}", + m.rule.syntax().id + )); + m.validation_response_status = StatusCode::INTERNAL_SERVER_ERROR; + cache_validation_result(fp, m); + clear_in_flight_validation(fp); + } + Err(_) => { + m.validation_success = false; + m.validation_response_body = validation_body::from_string(format!( + "Validation timed out after {} seconds", + validation_timeout.as_secs() + )); + m.validation_response_status = StatusCode::REQUEST_TIMEOUT; + cache_validation_result(fp, m); + clear_in_flight_validation(fp); } } } @@ -544,22 +574,12 @@ async fn timed_validate_single_match<'a>( } return; } - let notify = Arc::new(Notify::new()); - IN_FLIGHT.get().unwrap().insert(fp, notify.clone()); + IN_FLIGHT.get().unwrap().insert(fp, Arc::new(Notify::new())); // helper to persist result + notify waiters let commit_and_return = |m: &OwnedBlobMatch| { - VALIDATION_CACHE.get().unwrap().insert( - fp, - CachedResponse { - body: m.validation_response_body.clone(), - status: m.validation_response_status, - is_valid: m.validation_success, - timestamp: Instant::now(), - }, - ); - IN_FLIGHT.get().unwrap().remove(&fp); - notify.notify_waiters(); + cache_validation_result(fp, m); + clear_in_flight_validation(fp); }; // ──────────────────────────────────────────────────────────