diff --git a/crates/hephd/src/server.rs b/crates/hephd/src/server.rs index 89dee78..b94c715 100644 --- a/crates/hephd/src/server.rs +++ b/crates/hephd/src/server.rs @@ -153,6 +153,46 @@ fn annotate_reauth( } } +/// Log every 1-in-N repeats of an identical sync failure (the first occurrence +/// always logs). +const REPEAT_LOG_EVERY: u32 = 10; + +/// Per-loop log throttling state for background sync: announce recovery after a +/// failure streak, and suppress repeats of an identical failure message so a +/// down hub doesn't write the same warning every cycle. +#[derive(Default)] +struct SyncLoopLog { + consecutive_failures: u32, + last_error: Option, + repeats: u32, +} + +impl SyncLoopLog { + /// Fold in a success. Returns `Some(n)` when this ends a streak of `n` + /// failures — the recovery transition worth announcing. + fn on_success(&mut self) -> Option { + let failures = std::mem::take(&mut self.consecutive_failures); + self.last_error = None; + self.repeats = 0; + (failures > 0).then_some(failures) + } + + /// Fold in a failure. Returns whether this occurrence should log at warn + /// level: a new message always does; an identical repeat only every + /// [`REPEAT_LOG_EVERY`]-th time. + fn on_failure(&mut self, msg: &str) -> bool { + self.consecutive_failures += 1; + if self.last_error.as_deref() == Some(msg) { + self.repeats += 1; + self.repeats.is_multiple_of(REPEAT_LOG_EVERY) + } else { + self.last_error = Some(msg.to_string()); + self.repeats = 0; + true + } + } +} + impl Ctx { /// The current bearer token for hub sync (refreshing if expired). `Ok(None)` /// means this spoke has no auth configured / no token stored (it syncs @@ -276,6 +316,7 @@ impl Daemon { let ctx = self.ctx.clone(); tokio::spawn(async move { let mut tick = tokio::time::interval(interval); + let mut log = SyncLoopLog::default(); loop { tick.tick().await; let bearer = match ctx.bearer().await { @@ -285,7 +326,13 @@ impl Daemon { // rejected refresh) and skip; sending an unauthenticated // request would only 401 and mask it. record_bearer_failure(&ctx, &e); - tracing::warn!("background sync: could not obtain bearer token: {e}"); + let msg = format!("could not obtain bearer token: {e}"); + if log.on_failure(&msg) { + tracing::warn!( + consecutive = log.consecutive_failures, + "background sync: {msg}" + ); + } continue; } }; @@ -293,8 +340,35 @@ impl Daemon { sync::sync_once(ctx.store.clone(), &hub, &ctx.http, bearer.as_deref()).await; record_sync_outcome(&ctx, &result); match result { - Ok(report) => tracing::debug!(?report, "background sync"), - Err(e) => tracing::warn!("background sync failed: {e}"), + Ok(report) => { + if let Some(failures) = log.on_success() { + tracing::info!(failures, "background sync recovered"); + } + // Cycles that move ops log their volume + cursor advance + // at info; idle cycles stay at debug. + if report.pulled + report.pushed > 0 { + tracing::info!( + pulled = report.pulled, + applied = report.applied, + pushed = report.pushed, + pull_cursor = report.pull_cursor.as_deref().unwrap_or("-"), + push_cursor = report.push_cursor.as_deref().unwrap_or("-"), + "background sync moved ops" + ); + } else { + tracing::debug!(?report, "background sync"); + } + } + // `:#` prints the anyhow context chain (phase + hub url). + Err(e) => { + let msg = format!("{e:#}"); + if log.on_failure(&msg) { + tracing::warn!( + consecutive = log.consecutive_failures, + "background sync failed: {msg}" + ); + } + } } } }); @@ -474,3 +548,39 @@ async fn sync_status(ctx: &Ctx) -> Result { "reauth_command": reauth_command(Some(&hub_url), ctx.auth.as_ref()), })) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sync_loop_log_announces_recovery_after_a_failure_streak() { + let mut log = SyncLoopLog::default(); + assert_eq!(log.on_success(), None, "no streak, nothing to announce"); + assert!(log.on_failure("hub down")); + assert!(!log.on_failure("hub down")); + assert!(!log.on_failure("hub down")); + assert_eq!(log.on_success(), Some(3)); + assert_eq!(log.on_success(), None, "recovery announced once"); + } + + #[test] + fn sync_loop_log_throttles_identical_failures_but_not_new_ones() { + let mut log = SyncLoopLog::default(); + assert!(log.on_failure("hub down"), "first occurrence logs"); + for _ in 0..REPEAT_LOG_EVERY - 1 { + assert!(!log.on_failure("hub down"), "repeats are suppressed"); + } + assert!( + log.on_failure("hub down"), + "every {REPEAT_LOG_EVERY}th repeat logs" + ); + assert!( + log.on_failure("dns broke"), + "a new message logs immediately" + ); + assert!(!log.on_failure("dns broke")); + // Back to a previously-seen message: it changed, so it logs. + assert!(log.on_failure("hub down")); + } +} diff --git a/crates/hephd/src/sync.rs b/crates/hephd/src/sync.rs index 6b5a467..655e7a6 100644 --- a/crates/hephd/src/sync.rs +++ b/crates/hephd/src/sync.rs @@ -72,6 +72,12 @@ pub struct SyncReport { pub applied: usize, /// Ops sent to the hub. pub pushed: usize, + /// The pull cursor (HLC) this exchange advanced to, if it moved. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pull_cursor: Option, + /// The push cursor (HLC) this exchange advanced to, if it moved. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub push_cursor: Option, } /// Run `f` against the locked store on the blocking pool (DB calls never run on @@ -406,6 +412,7 @@ pub async fn sync_once( report.applied = applied; if let Some(cursor) = max_pulled { let hub = hub_url.to_string(); + report.pull_cursor = Some(cursor.clone()); with_store(&store, move |s| s.record_sync(&hub, None, Some(&cursor))).await?; } } @@ -432,6 +439,7 @@ pub async fn sync_once( .with_context(|| format!("sync push: hub {base} rejected the request"))?; if let Some(cursor) = max_pushed { let hub = hub_url.to_string(); + report.push_cursor = Some(cursor.clone()); with_store(&store, move |s| s.record_sync(&hub, Some(&cursor), None)).await?; } }