feat(hephd): sync observability — recovery, per-cycle volume, throttled failures

Background sync now logs cycles that move ops at info (pulled/applied/
pushed + the cursors they advanced to), announces recovery with the
length of the failure streak it ends, and suppresses repeats of an
identical failure to one warn per ten cycles. SyncReport carries the
advanced cursors (additive, wire-compatible).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Erich Blume 2026-06-09 10:49:37 -07:00
commit e65e2d3910
2 changed files with 121 additions and 3 deletions

View file

@ -153,6 +153,46 @@ fn annotate_reauth(
}
}
/// Log every 1-in-N repeats of an identical sync failure (the first occurrence
/// always logs).
const REPEAT_LOG_EVERY: u32 = 10;
/// Per-loop log throttling state for background sync: announce recovery after a
/// failure streak, and suppress repeats of an identical failure message so a
/// down hub doesn't write the same warning every cycle.
#[derive(Default)]
struct SyncLoopLog {
consecutive_failures: u32,
last_error: Option<String>,
repeats: u32,
}
impl SyncLoopLog {
/// Fold in a success. Returns `Some(n)` when this ends a streak of `n`
/// failures — the recovery transition worth announcing.
fn on_success(&mut self) -> Option<u32> {
let failures = std::mem::take(&mut self.consecutive_failures);
self.last_error = None;
self.repeats = 0;
(failures > 0).then_some(failures)
}
/// Fold in a failure. Returns whether this occurrence should log at warn
/// level: a new message always does; an identical repeat only every
/// [`REPEAT_LOG_EVERY`]-th time.
fn on_failure(&mut self, msg: &str) -> bool {
self.consecutive_failures += 1;
if self.last_error.as_deref() == Some(msg) {
self.repeats += 1;
self.repeats.is_multiple_of(REPEAT_LOG_EVERY)
} else {
self.last_error = Some(msg.to_string());
self.repeats = 0;
true
}
}
}
impl Ctx {
/// The current bearer token for hub sync (refreshing if expired). `Ok(None)`
/// means this spoke has no auth configured / no token stored (it syncs
@ -276,6 +316,7 @@ impl Daemon {
let ctx = self.ctx.clone();
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
let mut log = SyncLoopLog::default();
loop {
tick.tick().await;
let bearer = match ctx.bearer().await {
@ -285,7 +326,13 @@ impl Daemon {
// rejected refresh) and skip; sending an unauthenticated
// request would only 401 and mask it.
record_bearer_failure(&ctx, &e);
tracing::warn!("background sync: could not obtain bearer token: {e}");
let msg = format!("could not obtain bearer token: {e}");
if log.on_failure(&msg) {
tracing::warn!(
consecutive = log.consecutive_failures,
"background sync: {msg}"
);
}
continue;
}
};
@ -293,8 +340,35 @@ impl Daemon {
sync::sync_once(ctx.store.clone(), &hub, &ctx.http, bearer.as_deref()).await;
record_sync_outcome(&ctx, &result);
match result {
Ok(report) => tracing::debug!(?report, "background sync"),
Err(e) => tracing::warn!("background sync failed: {e}"),
Ok(report) => {
if let Some(failures) = log.on_success() {
tracing::info!(failures, "background sync recovered");
}
// Cycles that move ops log their volume + cursor advance
// at info; idle cycles stay at debug.
if report.pulled + report.pushed > 0 {
tracing::info!(
pulled = report.pulled,
applied = report.applied,
pushed = report.pushed,
pull_cursor = report.pull_cursor.as_deref().unwrap_or("-"),
push_cursor = report.push_cursor.as_deref().unwrap_or("-"),
"background sync moved ops"
);
} else {
tracing::debug!(?report, "background sync");
}
}
// `:#` prints the anyhow context chain (phase + hub url).
Err(e) => {
let msg = format!("{e:#}");
if log.on_failure(&msg) {
tracing::warn!(
consecutive = log.consecutive_failures,
"background sync failed: {msg}"
);
}
}
}
}
});
@ -474,3 +548,39 @@ async fn sync_status(ctx: &Ctx) -> Result<Value, RpcError> {
"reauth_command": reauth_command(Some(&hub_url), ctx.auth.as_ref()),
}))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sync_loop_log_announces_recovery_after_a_failure_streak() {
let mut log = SyncLoopLog::default();
assert_eq!(log.on_success(), None, "no streak, nothing to announce");
assert!(log.on_failure("hub down"));
assert!(!log.on_failure("hub down"));
assert!(!log.on_failure("hub down"));
assert_eq!(log.on_success(), Some(3));
assert_eq!(log.on_success(), None, "recovery announced once");
}
#[test]
fn sync_loop_log_throttles_identical_failures_but_not_new_ones() {
let mut log = SyncLoopLog::default();
assert!(log.on_failure("hub down"), "first occurrence logs");
for _ in 0..REPEAT_LOG_EVERY - 1 {
assert!(!log.on_failure("hub down"), "repeats are suppressed");
}
assert!(
log.on_failure("hub down"),
"every {REPEAT_LOG_EVERY}th repeat logs"
);
assert!(
log.on_failure("dns broke"),
"a new message logs immediately"
);
assert!(!log.on_failure("dns broke"));
// Back to a previously-seen message: it changed, so it logs.
assert!(log.on_failure("hub down"));
}
}

View file

@ -72,6 +72,12 @@ pub struct SyncReport {
pub applied: usize,
/// Ops sent to the hub.
pub pushed: usize,
/// The pull cursor (HLC) this exchange advanced to, if it moved.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pull_cursor: Option<String>,
/// The push cursor (HLC) this exchange advanced to, if it moved.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub push_cursor: Option<String>,
}
/// Run `f` against the locked store on the blocking pool (DB calls never run on
@ -406,6 +412,7 @@ pub async fn sync_once(
report.applied = applied;
if let Some(cursor) = max_pulled {
let hub = hub_url.to_string();
report.pull_cursor = Some(cursor.clone());
with_store(&store, move |s| s.record_sync(&hub, None, Some(&cursor))).await?;
}
}
@ -432,6 +439,7 @@ pub async fn sync_once(
.with_context(|| format!("sync push: hub {base} rejected the request"))?;
if let Some(cursor) = max_pushed {
let hub = hub_url.to_string();
report.push_cursor = Some(cursor.clone());
with_store(&store, move |s| s.record_sync(&hub, Some(&cursor), None)).await?;
}
}