From a0d2fa3611952fe379a3c5a1b055320221f6279e Mon Sep 17 00:00:00 2001 From: Mick Grove Date: Fri, 22 May 2026 13:15:59 -0700 Subject: [PATCH] merged 2 PRs and updated changelog --- src/main.rs | 6 +- src/scanner/validation.rs | 69 ++++++++++------------- src/util.rs | 3 +- src/validation.rs | 114 ++++++++++++++++++++++---------------- 4 files changed, 101 insertions(+), 91 deletions(-) diff --git a/src/main.rs b/src/main.rs index b89dc55..b7617b2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -131,8 +131,10 @@ enum AsyncMainOutcome { /// silent — this is a quality-of-life nudge, not a correctness requirement. #[cfg(unix)] fn raise_nproc_soft_limit() { - // SAFETY: getrlimit/setrlimit are async-signal-safe and take a properly - // sized `rlimit` we own. + // SAFETY: `getrlimit`/`setrlimit` are FFI calls. We pass pointers to + // properly initialized `libc::rlimit` values with the correct layout, only + // read `rl` after `getrlimit` reports success, and treat `setrlimit` + // failure as best-effort by ignoring its return value. unsafe { let mut rl = libc::rlimit { rlim_cur: 0, rlim_max: 0 }; if libc::getrlimit(libc::RLIMIT_NPROC, &mut rl) != 0 { diff --git a/src/scanner/validation.rs b/src/scanner/validation.rs index b547fb3..728c588 100644 --- a/src/scanner/validation.rs +++ b/src/scanner/validation.rs @@ -16,7 +16,7 @@ use indicatif::{ProgressBar, ProgressStyle}; use liquid::Parser; use reqwest::StatusCode; use rustc_hash::{FxHashMap, FxHashSet}; -use tokio::{sync::Notify, time::timeout}; +use tokio::sync::Notify; use tracing::{debug, trace, warn}; use crate::{ @@ -901,25 +901,22 @@ async fn validate_single( } // If we reach here, we're the first task to validate this key // Perform validation - let outcome = ValidationOutcome::from_timeout_result( - timeout( - validation_timeout, - catch_validation_panic( - validate_single_match( - om, - parser, - clients, - dep_vars, - missing_deps, - cache2, - validation_timeout, - validation_retries, - rate_limiter, - provider_endpoints.as_ref(), - max_body_len, - ) - .boxed(), - ), + let outcome = ValidationOutcome::from_panic_result( + catch_validation_panic( + validate_single_match( + om, + parser, + clients, + dep_vars, + missing_deps, + cache2, + validation_timeout, + validation_retries, + rate_limiter, + provider_endpoints.as_ref(), + max_body_len, + ) + .boxed(), ) .await, ); @@ -935,8 +932,9 @@ async fn validate_single( /// Result of attempting to validate a single match. /// -/// Flattens the `timeout(catch_validation_panic(..))` nesting into one -/// self-describing enum so call sites and signatures stay readable. +/// Flattens panic handling into a self-describing enum so call sites and +/// signatures stay readable. Validation timeouts are handled inside +/// `validate_single_match`, where the module-local de-dupe state can be cleaned. enum ValidationOutcome { /// Validation ran to completion; the match's own fields describe whether it /// succeeded or failed. @@ -944,18 +942,13 @@ enum ValidationOutcome { /// Validation panicked. The payload is captured for logging only and must /// never be surfaced to the user or cache (it may embed secret material). Panicked(String), - /// Validation exceeded the configured timeout. - TimedOut, } impl ValidationOutcome { - fn from_timeout_result( - result: std::result::Result, tokio::time::error::Elapsed>, - ) -> Self { + fn from_panic_result(result: std::result::Result<(), String>) -> Self { match result { - Ok(Ok(())) => ValidationOutcome::Completed, - Ok(Err(panic_message)) => ValidationOutcome::Panicked(panic_message), - Err(_) => ValidationOutcome::TimedOut, + Ok(()) => ValidationOutcome::Completed, + Err(panic_message) => ValidationOutcome::Panicked(panic_message), } } } @@ -1017,12 +1010,6 @@ fn apply_validation_outcome( }, ); } - ValidationOutcome::TimedOut => { - om.validation_success = false; - om.validation_response_body = validation_body::from_string("Validation timed out"); - om.validation_response_status = StatusCode::REQUEST_TIMEOUT; - fail_count.fetch_add(1, Ordering::Relaxed); - } } } @@ -1793,10 +1780,12 @@ mod tests { let success_count = AtomicUsize::new(0); let fail_count = AtomicUsize::new(0); - let outcome = ValidationOutcome::from_timeout_result(Ok(catch_validation_panic(async { - panic!("validator blew up"); - }) - .await)); + let outcome = ValidationOutcome::from_panic_result( + catch_validation_panic(async { + panic!("validator blew up"); + }) + .await, + ); apply_validation_outcome(&mut om, &cache_key, outcome, &success_count, &fail_count, &cache); diff --git a/src/util.rs b/src/util.rs index 57a78bf..9b6c1b8 100644 --- a/src/util.rs +++ b/src/util.rs @@ -28,8 +28,7 @@ const MAX_TOKIO_BLOCKING_THREADS: usize = 256; pub fn tokio_blocking_threads_limit(num_jobs: usize) -> usize { num_jobs .saturating_mul(TOKIO_BLOCKING_THREADS_PER_JOB) - .max(MIN_TOKIO_BLOCKING_THREADS) - .min(MAX_TOKIO_BLOCKING_THREADS) + .clamp(MIN_TOKIO_BLOCKING_THREADS, MAX_TOKIO_BLOCKING_THREADS) } /// Interns a string once and returns a `'static` reference to it. diff --git a/src/validation.rs b/src/validation.rs index d390f8a..f6041ea 100644 --- a/src/validation.rs +++ b/src/validation.rs @@ -2,6 +2,7 @@ use std::{ collections::BTreeMap, fs, hash::{Hash, Hasher}, + panic::AssertUnwindSafe, sync::Arc, time::{Duration, Instant}, }; @@ -17,7 +18,7 @@ use liquid_core::{Value, ValueView}; use reqwest::{Client, Url, header, header::HeaderValue, multipart}; use rustc_hash::{FxHashMap, FxHashSet}; use tokio::{sync::Notify, time}; -use tracing::{debug, trace}; +use tracing::{debug, trace, warn}; use crate::{ cli::global::TlsMode, @@ -303,6 +304,24 @@ fn validation_dedup_key(m: &OwnedBlobMatch) -> u64 { static VALIDATION_CACHE: OnceLock> = OnceLock::new(); static IN_FLIGHT: OnceLock>> = OnceLock::new(); +fn cache_validation_result(fp: u64, m: &OwnedBlobMatch) { + VALIDATION_CACHE.get_or_init(DashMap::new).insert( + fp, + CachedResponse { + body: m.validation_response_body.clone(), + status: m.validation_response_status, + is_valid: m.validation_success, + timestamp: Instant::now(), + }, + ); +} + +fn clear_in_flight_validation(fp: u64) { + if let Some((_, notify)) = IN_FLIGHT.get_or_init(DashMap::new).remove(&fp) { + notify.notify_waiters(); + } +} + /// Call this once near program start (e.g. in `main()`) pub fn init_validation_caches() { VALIDATION_CACHE.set(DashMap::new()).ok(); @@ -457,45 +476,56 @@ pub async fn validate_single_match( max_body_len: usize, ) { let fp = validation_dedup_key(m); + // Keep the unwind boundary inside this module so the process-wide + // validation de-dupe state is cleared before the caller observes a panic. + // The panic branch below overwrites the match with a deterministic failure. let timeout_result = time::timeout( validation_timeout, - timed_validate_single_match( - m, - parser, - clients, - dependent_variables, - missing_dependencies, - cache, - validation_timeout, - validation_retries, - rate_limiter, - provider_endpoints, - max_body_len, + AssertUnwindSafe( + timed_validate_single_match( + m, + parser, + clients, + dependent_variables, + missing_dependencies, + cache, + validation_timeout, + validation_retries, + rate_limiter, + provider_endpoints, + max_body_len, + ) + .boxed(), ) - .boxed(), + .catch_unwind(), ) .await; - if timeout_result.is_err() { - m.validation_success = false; - m.validation_response_body = validation_body::from_string(format!( - "Validation timed out after {} seconds", - validation_timeout.as_secs() - )); - m.validation_response_status = StatusCode::REQUEST_TIMEOUT; - - VALIDATION_CACHE.get_or_init(DashMap::new).insert( - fp, - CachedResponse { - body: m.validation_response_body.clone(), - status: m.validation_response_status, - is_valid: false, - timestamp: Instant::now(), - }, - ); - - if let Some((_, notify)) = IN_FLIGHT.get_or_init(DashMap::new).remove(&fp) { - notify.notify_waiters(); + match timeout_result { + Ok(Ok(())) => {} + Ok(Err(_panic_payload)) => { + warn!( + rule_id = %m.rule.syntax().id, + "validator panicked; marking match as failed", + ); + m.validation_success = false; + m.validation_response_body = validation_body::from_string(format!( + "Validation panicked for rule {}", + m.rule.syntax().id + )); + m.validation_response_status = StatusCode::INTERNAL_SERVER_ERROR; + cache_validation_result(fp, m); + clear_in_flight_validation(fp); + } + Err(_) => { + m.validation_success = false; + m.validation_response_body = validation_body::from_string(format!( + "Validation timed out after {} seconds", + validation_timeout.as_secs() + )); + m.validation_response_status = StatusCode::REQUEST_TIMEOUT; + cache_validation_result(fp, m); + clear_in_flight_validation(fp); } } } @@ -544,22 +574,12 @@ async fn timed_validate_single_match<'a>( } return; } - let notify = Arc::new(Notify::new()); - IN_FLIGHT.get().unwrap().insert(fp, notify.clone()); + IN_FLIGHT.get().unwrap().insert(fp, Arc::new(Notify::new())); // helper to persist result + notify waiters let commit_and_return = |m: &OwnedBlobMatch| { - VALIDATION_CACHE.get().unwrap().insert( - fp, - CachedResponse { - body: m.validation_response_body.clone(), - status: m.validation_response_status, - is_valid: m.validation_success, - timestamp: Instant::now(), - }, - ); - IN_FLIGHT.get().unwrap().remove(&fp); - notify.notify_waiters(); + cache_validation_result(fp, m); + clear_in_flight_validation(fp); }; // ──────────────────────────────────────────────────────────