merged 2 PRs and updated changelog

This commit is contained in:
Mick Grove 2026-05-22 13:15:59 -07:00
commit a0d2fa3611
4 changed files with 101 additions and 91 deletions

View file

@ -131,8 +131,10 @@ enum AsyncMainOutcome {
/// silent — this is a quality-of-life nudge, not a correctness requirement.
#[cfg(unix)]
fn raise_nproc_soft_limit() {
// SAFETY: getrlimit/setrlimit are async-signal-safe and take a properly
// sized `rlimit` we own.
// SAFETY: `getrlimit`/`setrlimit` are FFI calls. We pass pointers to
// properly initialized `libc::rlimit` values with the correct layout, only
// read `rl` after `getrlimit` reports success, and treat `setrlimit`
// failure as best-effort by ignoring its return value.
unsafe {
let mut rl = libc::rlimit { rlim_cur: 0, rlim_max: 0 };
if libc::getrlimit(libc::RLIMIT_NPROC, &mut rl) != 0 {

View file

@ -16,7 +16,7 @@ use indicatif::{ProgressBar, ProgressStyle};
use liquid::Parser;
use reqwest::StatusCode;
use rustc_hash::{FxHashMap, FxHashSet};
use tokio::{sync::Notify, time::timeout};
use tokio::sync::Notify;
use tracing::{debug, trace, warn};
use crate::{
@ -901,25 +901,22 @@ async fn validate_single(
}
// If we reach here, we're the first task to validate this key
// Perform validation
let outcome = ValidationOutcome::from_timeout_result(
timeout(
validation_timeout,
catch_validation_panic(
validate_single_match(
om,
parser,
clients,
dep_vars,
missing_deps,
cache2,
validation_timeout,
validation_retries,
rate_limiter,
provider_endpoints.as_ref(),
max_body_len,
)
.boxed(),
),
let outcome = ValidationOutcome::from_panic_result(
catch_validation_panic(
validate_single_match(
om,
parser,
clients,
dep_vars,
missing_deps,
cache2,
validation_timeout,
validation_retries,
rate_limiter,
provider_endpoints.as_ref(),
max_body_len,
)
.boxed(),
)
.await,
);
@ -935,8 +932,9 @@ async fn validate_single(
/// Result of attempting to validate a single match.
///
/// Flattens the `timeout(catch_validation_panic(..))` nesting into one
/// self-describing enum so call sites and signatures stay readable.
/// Flattens panic handling into a self-describing enum so call sites and
/// signatures stay readable. Validation timeouts are handled inside
/// `validate_single_match`, where the module-local de-dupe state can be cleaned.
enum ValidationOutcome {
/// Validation ran to completion; the match's own fields describe whether it
/// succeeded or failed.
@ -944,18 +942,13 @@ enum ValidationOutcome {
/// Validation panicked. The payload is captured for logging only and must
/// never be surfaced to the user or cache (it may embed secret material).
Panicked(String),
/// Validation exceeded the configured timeout.
TimedOut,
}
impl ValidationOutcome {
fn from_timeout_result(
result: std::result::Result<std::result::Result<(), String>, tokio::time::error::Elapsed>,
) -> Self {
fn from_panic_result(result: std::result::Result<(), String>) -> Self {
match result {
Ok(Ok(())) => ValidationOutcome::Completed,
Ok(Err(panic_message)) => ValidationOutcome::Panicked(panic_message),
Err(_) => ValidationOutcome::TimedOut,
Ok(()) => ValidationOutcome::Completed,
Err(panic_message) => ValidationOutcome::Panicked(panic_message),
}
}
}
@ -1017,12 +1010,6 @@ fn apply_validation_outcome(
},
);
}
ValidationOutcome::TimedOut => {
om.validation_success = false;
om.validation_response_body = validation_body::from_string("Validation timed out");
om.validation_response_status = StatusCode::REQUEST_TIMEOUT;
fail_count.fetch_add(1, Ordering::Relaxed);
}
}
}
@ -1793,10 +1780,12 @@ mod tests {
let success_count = AtomicUsize::new(0);
let fail_count = AtomicUsize::new(0);
let outcome = ValidationOutcome::from_timeout_result(Ok(catch_validation_panic(async {
panic!("validator blew up");
})
.await));
let outcome = ValidationOutcome::from_panic_result(
catch_validation_panic(async {
panic!("validator blew up");
})
.await,
);
apply_validation_outcome(&mut om, &cache_key, outcome, &success_count, &fail_count, &cache);

View file

@ -28,8 +28,7 @@ const MAX_TOKIO_BLOCKING_THREADS: usize = 256;
pub fn tokio_blocking_threads_limit(num_jobs: usize) -> usize {
num_jobs
.saturating_mul(TOKIO_BLOCKING_THREADS_PER_JOB)
.max(MIN_TOKIO_BLOCKING_THREADS)
.min(MAX_TOKIO_BLOCKING_THREADS)
.clamp(MIN_TOKIO_BLOCKING_THREADS, MAX_TOKIO_BLOCKING_THREADS)
}
/// Interns a string once and returns a `'static` reference to it.

View file

@ -2,6 +2,7 @@ use std::{
collections::BTreeMap,
fs,
hash::{Hash, Hasher},
panic::AssertUnwindSafe,
sync::Arc,
time::{Duration, Instant},
};
@ -17,7 +18,7 @@ use liquid_core::{Value, ValueView};
use reqwest::{Client, Url, header, header::HeaderValue, multipart};
use rustc_hash::{FxHashMap, FxHashSet};
use tokio::{sync::Notify, time};
use tracing::{debug, trace};
use tracing::{debug, trace, warn};
use crate::{
cli::global::TlsMode,
@ -303,6 +304,24 @@ fn validation_dedup_key(m: &OwnedBlobMatch) -> u64 {
static VALIDATION_CACHE: OnceLock<DashMap<u64, CachedResponse>> = OnceLock::new();
static IN_FLIGHT: OnceLock<DashMap<u64, Arc<Notify>>> = OnceLock::new();
fn cache_validation_result(fp: u64, m: &OwnedBlobMatch) {
VALIDATION_CACHE.get_or_init(DashMap::new).insert(
fp,
CachedResponse {
body: m.validation_response_body.clone(),
status: m.validation_response_status,
is_valid: m.validation_success,
timestamp: Instant::now(),
},
);
}
fn clear_in_flight_validation(fp: u64) {
if let Some((_, notify)) = IN_FLIGHT.get_or_init(DashMap::new).remove(&fp) {
notify.notify_waiters();
}
}
/// Call this once near program start (e.g. in `main()`)
pub fn init_validation_caches() {
VALIDATION_CACHE.set(DashMap::new()).ok();
@ -457,45 +476,56 @@ pub async fn validate_single_match(
max_body_len: usize,
) {
let fp = validation_dedup_key(m);
// Keep the unwind boundary inside this module so the process-wide
// validation de-dupe state is cleared before the caller observes a panic.
// The panic branch below overwrites the match with a deterministic failure.
let timeout_result = time::timeout(
validation_timeout,
timed_validate_single_match(
m,
parser,
clients,
dependent_variables,
missing_dependencies,
cache,
validation_timeout,
validation_retries,
rate_limiter,
provider_endpoints,
max_body_len,
AssertUnwindSafe(
timed_validate_single_match(
m,
parser,
clients,
dependent_variables,
missing_dependencies,
cache,
validation_timeout,
validation_retries,
rate_limiter,
provider_endpoints,
max_body_len,
)
.boxed(),
)
.boxed(),
.catch_unwind(),
)
.await;
if timeout_result.is_err() {
m.validation_success = false;
m.validation_response_body = validation_body::from_string(format!(
"Validation timed out after {} seconds",
validation_timeout.as_secs()
));
m.validation_response_status = StatusCode::REQUEST_TIMEOUT;
VALIDATION_CACHE.get_or_init(DashMap::new).insert(
fp,
CachedResponse {
body: m.validation_response_body.clone(),
status: m.validation_response_status,
is_valid: false,
timestamp: Instant::now(),
},
);
if let Some((_, notify)) = IN_FLIGHT.get_or_init(DashMap::new).remove(&fp) {
notify.notify_waiters();
match timeout_result {
Ok(Ok(())) => {}
Ok(Err(_panic_payload)) => {
warn!(
rule_id = %m.rule.syntax().id,
"validator panicked; marking match as failed",
);
m.validation_success = false;
m.validation_response_body = validation_body::from_string(format!(
"Validation panicked for rule {}",
m.rule.syntax().id
));
m.validation_response_status = StatusCode::INTERNAL_SERVER_ERROR;
cache_validation_result(fp, m);
clear_in_flight_validation(fp);
}
Err(_) => {
m.validation_success = false;
m.validation_response_body = validation_body::from_string(format!(
"Validation timed out after {} seconds",
validation_timeout.as_secs()
));
m.validation_response_status = StatusCode::REQUEST_TIMEOUT;
cache_validation_result(fp, m);
clear_in_flight_validation(fp);
}
}
}
@ -544,22 +574,12 @@ async fn timed_validate_single_match<'a>(
}
return;
}
let notify = Arc::new(Notify::new());
IN_FLIGHT.get().unwrap().insert(fp, notify.clone());
IN_FLIGHT.get().unwrap().insert(fp, Arc::new(Notify::new()));
// helper to persist result + notify waiters
let commit_and_return = |m: &OwnedBlobMatch| {
VALIDATION_CACHE.get().unwrap().insert(
fp,
CachedResponse {
body: m.validation_response_body.clone(),
status: m.validation_response_status,
is_valid: m.validation_success,
timestamp: Instant::now(),
},
);
IN_FLIGHT.get().unwrap().remove(&fp);
notify.notify_waiters();
cache_validation_result(fp, m);
clear_in_flight_validation(fp);
};
// ──────────────────────────────────────────────────────────