From e65e2d3910b9775c8e8cf4162fc9720b5bb4568b Mon Sep 17 00:00:00 2001 From: Erich Blume Date: Tue, 9 Jun 2026 10:49:37 -0700 Subject: [PATCH] =?UTF-8?q?feat(hephd):=20sync=20observability=20=E2=80=94?= =?UTF-8?q?=20recovery,=20per-cycle=20volume,=20throttled=20failures?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Background sync now logs cycles that move ops at info (pulled/applied/ pushed + the cursors they advanced to), announces recovery with the length of the failure streak it ends, and suppresses repeats of an identical failure to one warn per ten cycles. SyncReport carries the advanced cursors (additive, wire-compatible). Co-Authored-By: Claude Fable 5 --- crates/hephd/src/server.rs | 116 ++++++++++++++++++++++++++++++++++++- crates/hephd/src/sync.rs | 8 +++ 2 files changed, 121 insertions(+), 3 deletions(-) diff --git a/crates/hephd/src/server.rs b/crates/hephd/src/server.rs index 89dee78..b94c715 100644 --- a/crates/hephd/src/server.rs +++ b/crates/hephd/src/server.rs @@ -153,6 +153,46 @@ fn annotate_reauth( } } +/// Log every 1-in-N repeats of an identical sync failure (the first occurrence +/// always logs). +const REPEAT_LOG_EVERY: u32 = 10; + +/// Per-loop log throttling state for background sync: announce recovery after a +/// failure streak, and suppress repeats of an identical failure message so a +/// down hub doesn't write the same warning every cycle. +#[derive(Default)] +struct SyncLoopLog { + consecutive_failures: u32, + last_error: Option, + repeats: u32, +} + +impl SyncLoopLog { + /// Fold in a success. Returns `Some(n)` when this ends a streak of `n` + /// failures — the recovery transition worth announcing. + fn on_success(&mut self) -> Option { + let failures = std::mem::take(&mut self.consecutive_failures); + self.last_error = None; + self.repeats = 0; + (failures > 0).then_some(failures) + } + + /// Fold in a failure. Returns whether this occurrence should log at warn + /// level: a new message always does; an identical repeat only every + /// [`REPEAT_LOG_EVERY`]-th time. + fn on_failure(&mut self, msg: &str) -> bool { + self.consecutive_failures += 1; + if self.last_error.as_deref() == Some(msg) { + self.repeats += 1; + self.repeats.is_multiple_of(REPEAT_LOG_EVERY) + } else { + self.last_error = Some(msg.to_string()); + self.repeats = 0; + true + } + } +} + impl Ctx { /// The current bearer token for hub sync (refreshing if expired). `Ok(None)` /// means this spoke has no auth configured / no token stored (it syncs @@ -276,6 +316,7 @@ impl Daemon { let ctx = self.ctx.clone(); tokio::spawn(async move { let mut tick = tokio::time::interval(interval); + let mut log = SyncLoopLog::default(); loop { tick.tick().await; let bearer = match ctx.bearer().await { @@ -285,7 +326,13 @@ impl Daemon { // rejected refresh) and skip; sending an unauthenticated // request would only 401 and mask it. record_bearer_failure(&ctx, &e); - tracing::warn!("background sync: could not obtain bearer token: {e}"); + let msg = format!("could not obtain bearer token: {e}"); + if log.on_failure(&msg) { + tracing::warn!( + consecutive = log.consecutive_failures, + "background sync: {msg}" + ); + } continue; } }; @@ -293,8 +340,35 @@ impl Daemon { sync::sync_once(ctx.store.clone(), &hub, &ctx.http, bearer.as_deref()).await; record_sync_outcome(&ctx, &result); match result { - Ok(report) => tracing::debug!(?report, "background sync"), - Err(e) => tracing::warn!("background sync failed: {e}"), + Ok(report) => { + if let Some(failures) = log.on_success() { + tracing::info!(failures, "background sync recovered"); + } + // Cycles that move ops log their volume + cursor advance + // at info; idle cycles stay at debug. + if report.pulled + report.pushed > 0 { + tracing::info!( + pulled = report.pulled, + applied = report.applied, + pushed = report.pushed, + pull_cursor = report.pull_cursor.as_deref().unwrap_or("-"), + push_cursor = report.push_cursor.as_deref().unwrap_or("-"), + "background sync moved ops" + ); + } else { + tracing::debug!(?report, "background sync"); + } + } + // `:#` prints the anyhow context chain (phase + hub url). + Err(e) => { + let msg = format!("{e:#}"); + if log.on_failure(&msg) { + tracing::warn!( + consecutive = log.consecutive_failures, + "background sync failed: {msg}" + ); + } + } } } }); @@ -474,3 +548,39 @@ async fn sync_status(ctx: &Ctx) -> Result { "reauth_command": reauth_command(Some(&hub_url), ctx.auth.as_ref()), })) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sync_loop_log_announces_recovery_after_a_failure_streak() { + let mut log = SyncLoopLog::default(); + assert_eq!(log.on_success(), None, "no streak, nothing to announce"); + assert!(log.on_failure("hub down")); + assert!(!log.on_failure("hub down")); + assert!(!log.on_failure("hub down")); + assert_eq!(log.on_success(), Some(3)); + assert_eq!(log.on_success(), None, "recovery announced once"); + } + + #[test] + fn sync_loop_log_throttles_identical_failures_but_not_new_ones() { + let mut log = SyncLoopLog::default(); + assert!(log.on_failure("hub down"), "first occurrence logs"); + for _ in 0..REPEAT_LOG_EVERY - 1 { + assert!(!log.on_failure("hub down"), "repeats are suppressed"); + } + assert!( + log.on_failure("hub down"), + "every {REPEAT_LOG_EVERY}th repeat logs" + ); + assert!( + log.on_failure("dns broke"), + "a new message logs immediately" + ); + assert!(!log.on_failure("dns broke")); + // Back to a previously-seen message: it changed, so it logs. + assert!(log.on_failure("hub down")); + } +} diff --git a/crates/hephd/src/sync.rs b/crates/hephd/src/sync.rs index 6b5a467..655e7a6 100644 --- a/crates/hephd/src/sync.rs +++ b/crates/hephd/src/sync.rs @@ -72,6 +72,12 @@ pub struct SyncReport { pub applied: usize, /// Ops sent to the hub. pub pushed: usize, + /// The pull cursor (HLC) this exchange advanced to, if it moved. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pull_cursor: Option, + /// The push cursor (HLC) this exchange advanced to, if it moved. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub push_cursor: Option, } /// Run `f` against the locked store on the blocking pool (DB calls never run on @@ -406,6 +412,7 @@ pub async fn sync_once( report.applied = applied; if let Some(cursor) = max_pulled { let hub = hub_url.to_string(); + report.pull_cursor = Some(cursor.clone()); with_store(&store, move |s| s.record_sync(&hub, None, Some(&cursor))).await?; } } @@ -432,6 +439,7 @@ pub async fn sync_once( .with_context(|| format!("sync push: hub {base} rejected the request"))?; if let Some(cursor) = max_pushed { let hub = hub_url.to_string(); + report.push_cursor = Some(cursor.clone()); with_store(&store, move |s| s.record_sync(&hub, Some(&cursor), None)).await?; } }