generated from eblume/project-template
feat: daemon status surfaces runtime config + self-update state
sync.status now carries a runtime block — version, mode, sync cadence, and self-update state (interval + last check/outcome, tracked by a new SelfUpdateHealth shared with the poll loop). `heph daemon status` asks the live daemon and prints it under the service facts: hub + oidc, sync health at a glance, open-conflict count, self-update status. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
parent
e65e2d3910
commit
0e5bed3282
5 changed files with 195 additions and 8 deletions
|
|
@ -691,6 +691,86 @@ fn print_status(installed: bool, running: bool, p: &Paths, service_file: &Path)
|
|||
println!("log : {}", p.log.display());
|
||||
if !running {
|
||||
println!("\n(start it with `heph daemon start`)");
|
||||
return;
|
||||
}
|
||||
print_runtime_status(&p.socket);
|
||||
}
|
||||
|
||||
/// Ask the live daemon (over its socket) for its runtime config + sync /
|
||||
/// self-update state, and print it under the service facts. Best-effort: a
|
||||
/// daemon that won't answer is reported, not an error.
|
||||
fn print_runtime_status(socket: &Path) {
|
||||
let status = hephd::Client::connect(socket)
|
||||
.and_then(|mut c| c.call("sync.status", serde_json::json!({})));
|
||||
let status = match status {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
println!("\n(daemon did not answer sync.status: {e})");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let s = |v: &serde_json::Value| v.as_str().map(str::to_string);
|
||||
let runtime = &status["runtime"];
|
||||
println!();
|
||||
if let Some(v) = s(&runtime["version"]) {
|
||||
println!("version : {v}");
|
||||
}
|
||||
if let Some(m) = s(&runtime["mode"]) {
|
||||
println!("mode : {m}");
|
||||
}
|
||||
match s(&status["hub_url"]) {
|
||||
Some(hub) => {
|
||||
let interval = runtime["sync_interval_secs"]
|
||||
.as_u64()
|
||||
.map(|n| format!(" (every {n}s)"))
|
||||
.unwrap_or_default();
|
||||
println!("hub : {hub}{interval}");
|
||||
if let Some(issuer) = s(&status["auth"]["issuer"]) {
|
||||
println!("oidc : {issuer}");
|
||||
}
|
||||
let health = &status["health"];
|
||||
match s(&health["last_error"]) {
|
||||
Some(err) => println!("sync : FAILING — {err}"),
|
||||
None => match health["last_success_ms"].as_i64() {
|
||||
Some(ms) => println!("sync : ok (last success {})", fmt_age(ms)),
|
||||
None => println!("sync : no exchange yet"),
|
||||
},
|
||||
}
|
||||
}
|
||||
None => println!("hub : (none — standalone)"),
|
||||
}
|
||||
if let Some(n) = status["conflicts"].as_u64() {
|
||||
if n > 0 {
|
||||
println!("conflicts : {n} open (see `heph conflicts list`)");
|
||||
}
|
||||
}
|
||||
match &runtime["self_update"] {
|
||||
serde_json::Value::Null => println!("selfupdate: off"),
|
||||
su => {
|
||||
let every = su["interval_secs"]
|
||||
.as_u64()
|
||||
.map(|n| format!("every {n}s"))
|
||||
.unwrap_or_default();
|
||||
let outcome = match (su["last_check_ms"].as_i64(), s(&su["last_outcome"])) {
|
||||
(Some(ms), Some(o)) => format!("; last check {}: {o}", fmt_age(ms)),
|
||||
_ => "; no check yet".to_string(),
|
||||
};
|
||||
println!("selfupdate: on, {every}{outcome}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// "Ns ago" for an epoch-ms timestamp (coarse, human-scale).
|
||||
fn fmt_age(epoch_ms: i64) -> String {
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map(|d| d.as_millis() as i64)
|
||||
.unwrap_or(0);
|
||||
let secs = ((now - epoch_ms) / 1000).max(0);
|
||||
match secs {
|
||||
0..=119 => format!("{secs}s ago"),
|
||||
120..=7199 => format!("{}m ago", secs / 60),
|
||||
_ => format!("{}h ago", secs / 3600),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -154,7 +154,9 @@ async fn main() -> Result<()> {
|
|||
};
|
||||
(
|
||||
None,
|
||||
Daemon::new(store).with_self_update(self_update.clone()),
|
||||
Daemon::new(store)
|
||||
.with_mode("client")
|
||||
.with_self_update(self_update.clone()),
|
||||
)
|
||||
}
|
||||
Mode::Local | Mode::Server => {
|
||||
|
|
@ -170,6 +172,11 @@ async fn main() -> Result<()> {
|
|||
spoke_auth(hub, cli.oidc_issuer.as_ref(), cli.oidc_client_id.as_ref())
|
||||
});
|
||||
let daemon = Daemon::new(store)
|
||||
.with_mode(if cli.mode == Mode::Server {
|
||||
"server"
|
||||
} else {
|
||||
"local"
|
||||
})
|
||||
.with_hub(cli.hub_url.clone())
|
||||
.with_spoke_auth(spoke)
|
||||
.with_self_update(self_update.clone());
|
||||
|
|
|
|||
|
|
@ -32,6 +32,29 @@ impl SelfUpdateConfig {
|
|||
}
|
||||
}
|
||||
|
||||
/// Observed poller state, shared with the daemon so `sync.status` (and through
|
||||
/// it `heph daemon status`) can report what self-update last did instead of
|
||||
/// only logging it. All times epoch ms; `None` means "no check yet".
|
||||
#[derive(Clone, Default, serde::Serialize)]
|
||||
pub struct SelfUpdateHealth {
|
||||
/// When the last release check completed.
|
||||
pub last_check_ms: Option<i64>,
|
||||
/// Human-readable outcome of that check.
|
||||
pub last_outcome: Option<String>,
|
||||
}
|
||||
|
||||
impl SelfUpdateHealth {
|
||||
fn record(health: &std::sync::Mutex<Self>, outcome: String) {
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map(|d| d.as_millis() as i64)
|
||||
.unwrap_or(0);
|
||||
let mut h = health.lock().expect("self-update health mutex poisoned");
|
||||
h.last_check_ms = Some(now);
|
||||
h.last_outcome = Some(outcome);
|
||||
}
|
||||
}
|
||||
|
||||
/// The forge releases feed for this project — the latest tagged release. The
|
||||
/// repo is public, so this is an unauthenticated GET on the canonical public
|
||||
/// host.
|
||||
|
|
@ -225,13 +248,15 @@ pub async fn apply_update(
|
|||
}
|
||||
|
||||
/// The background poll loop: tick on `interval`, check for a newer release, and
|
||||
/// when one is available, apply it. Runs forever; spawned as a task.
|
||||
/// when one is available, apply it. Each check's outcome is folded into
|
||||
/// `health` for `sync.status`. Runs forever; spawned as a task.
|
||||
pub async fn run_poll_loop<S: ReleaseSource>(
|
||||
source: S,
|
||||
installer: Arc<dyn Installer>,
|
||||
restarter: Arc<dyn Restarter>,
|
||||
interval: Duration,
|
||||
current: &'static str,
|
||||
health: Arc<std::sync::Mutex<SelfUpdateHealth>>,
|
||||
) {
|
||||
let mut tick = tokio::time::interval(interval);
|
||||
loop {
|
||||
|
|
@ -239,14 +264,22 @@ pub async fn run_poll_loop<S: ReleaseSource>(
|
|||
match check_release(&source, current).await {
|
||||
CheckOutcome::UpdateAvailable(tag) => {
|
||||
tracing::info!(%tag, current, "self-update: newer release available, applying");
|
||||
SelfUpdateHealth::record(&health, format!("applying update to {tag}"));
|
||||
// On success the restarter exits the process, so this only
|
||||
// returns on failure — log it and keep polling.
|
||||
if let Err(e) = apply_update(installer.clone(), restarter.clone(), &tag).await {
|
||||
tracing::error!("self-update: failed for {tag}: {e}");
|
||||
SelfUpdateHealth::record(&health, format!("install of {tag} failed: {e}"));
|
||||
}
|
||||
}
|
||||
CheckOutcome::UpToDate => tracing::debug!(current, "self-update: up to date"),
|
||||
CheckOutcome::Failed(e) => tracing::warn!("self-update: release check failed: {e}"),
|
||||
CheckOutcome::UpToDate => {
|
||||
tracing::debug!(current, "self-update: up to date");
|
||||
SelfUpdateHealth::record(&health, format!("up to date ({current})"));
|
||||
}
|
||||
CheckOutcome::Failed(e) => {
|
||||
tracing::warn!("self-update: release check failed: {e}");
|
||||
SelfUpdateHealth::record(&health, format!("release check failed: {e}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -64,6 +64,12 @@ struct Ctx {
|
|||
self_update: Option<SelfUpdateConfig>,
|
||||
/// Live sync health, shared between the background loop and `sync.status`.
|
||||
sync_health: Arc<Mutex<SyncHealth>>,
|
||||
/// Live self-update poller state, shared with `sync.status`.
|
||||
self_update_health: Arc<Mutex<selfupdate::SelfUpdateHealth>>,
|
||||
/// Runtime mode (`local`/`server`/`client`), for `sync.status`.
|
||||
mode: Option<String>,
|
||||
/// Background sync cadence, recorded when the loop is spawned.
|
||||
sync_interval_secs: Arc<Mutex<Option<u64>>>,
|
||||
}
|
||||
|
||||
/// Epoch-ms wall clock (the daemon may read it; only `heph-core` is clock-pure).
|
||||
|
|
@ -236,10 +242,19 @@ impl Daemon {
|
|||
auth: None,
|
||||
self_update: None,
|
||||
sync_health: Arc::new(Mutex::new(SyncHealth::default())),
|
||||
self_update_health: Arc::new(Mutex::new(selfupdate::SelfUpdateHealth::default())),
|
||||
mode: None,
|
||||
sync_interval_secs: Arc::new(Mutex::new(None)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Record the runtime mode (`local`/`server`/`client`) for `sync.status`.
|
||||
pub fn with_mode(mut self, mode: impl Into<String>) -> Daemon {
|
||||
self.ctx.mode = Some(mode.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Configure the hub this device syncs with (`sync.now` targets it).
|
||||
pub fn with_hub(mut self, hub_url: Option<String>) -> Daemon {
|
||||
self.ctx.hub_url = hub_url;
|
||||
|
|
@ -294,6 +309,7 @@ impl Daemon {
|
|||
current = heph_core::VERSION,
|
||||
"self-update enabled"
|
||||
);
|
||||
let health = self.ctx.self_update_health.clone();
|
||||
tokio::spawn(async move {
|
||||
selfupdate::run_poll_loop(
|
||||
source,
|
||||
|
|
@ -301,6 +317,7 @@ impl Daemon {
|
|||
restarter,
|
||||
cfg.interval,
|
||||
heph_core::VERSION,
|
||||
health,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
|
@ -313,6 +330,11 @@ impl Daemon {
|
|||
let Some(hub) = self.ctx.hub_url.clone() else {
|
||||
return;
|
||||
};
|
||||
*self
|
||||
.ctx
|
||||
.sync_interval_secs
|
||||
.lock()
|
||||
.expect("sync interval mutex poisoned") = Some(interval.as_secs());
|
||||
let ctx = self.ctx.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut tick = tokio::time::interval(interval);
|
||||
|
|
@ -489,9 +511,10 @@ async fn sync_now(ctx: &Ctx) -> Result<Value, RpcError> {
|
|||
}
|
||||
|
||||
/// `sync.status` — the hub url, the current per-hub cursors, the observed sync
|
||||
/// health (last-success time / last error / auth-failure flag), and the pending
|
||||
/// merge-conflict count. A spoke that is silently failing is visible here (and,
|
||||
/// via it, in the TUI status line).
|
||||
/// health (last-success time / last error / auth-failure flag), the pending
|
||||
/// merge-conflict count, and the daemon's runtime config (version, mode, sync
|
||||
/// cadence, self-update state). A spoke that is silently failing is visible
|
||||
/// here (and, via it, in the TUI status line and `heph daemon status`).
|
||||
async fn sync_status(ctx: &Ctx) -> Result<Value, RpcError> {
|
||||
// Conflict count is meaningful even on a hub / standalone instance.
|
||||
let store = ctx.store.clone();
|
||||
|
|
@ -506,8 +529,35 @@ async fn sync_status(ctx: &Ctx) -> Result<Value, RpcError> {
|
|||
})?
|
||||
.map_err(RpcError::from)?;
|
||||
|
||||
// Runtime config: launch-time facts a client can't otherwise see.
|
||||
let self_update = ctx.self_update.as_ref().map(|cfg| {
|
||||
let h = ctx
|
||||
.self_update_health
|
||||
.lock()
|
||||
.expect("self-update health mutex poisoned")
|
||||
.clone();
|
||||
json!({
|
||||
"interval_secs": cfg.interval.as_secs(),
|
||||
"last_check_ms": h.last_check_ms,
|
||||
"last_outcome": h.last_outcome,
|
||||
})
|
||||
});
|
||||
let runtime = json!({
|
||||
"version": heph_core::VERSION,
|
||||
"mode": ctx.mode,
|
||||
"sync_interval_secs": *ctx
|
||||
.sync_interval_secs
|
||||
.lock()
|
||||
.expect("sync interval mutex poisoned"),
|
||||
"self_update": self_update,
|
||||
});
|
||||
|
||||
let Some(hub_url) = ctx.hub_url.clone() else {
|
||||
return Ok(json!({ "hub_url": Value::Null, "conflicts": conflicts }));
|
||||
return Ok(json!({
|
||||
"hub_url": Value::Null,
|
||||
"conflicts": conflicts,
|
||||
"runtime": runtime,
|
||||
}));
|
||||
};
|
||||
|
||||
let store = ctx.store.clone();
|
||||
|
|
@ -546,6 +596,7 @@ async fn sync_status(ctx: &Ctx) -> Result<Value, RpcError> {
|
|||
"health": health,
|
||||
"auth": auth,
|
||||
"reauth_command": reauth_command(Some(&hub_url), ctx.auth.as_ref()),
|
||||
"runtime": runtime,
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -727,3 +727,19 @@ fn project_reparent_moves_and_rejects_cycles_over_socket() {
|
|||
.unwrap();
|
||||
assert_eq!(heph_row["parent_id"], Value::Null);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync_status_reports_runtime_config() {
|
||||
let (socket, _dir) = spawn_daemon();
|
||||
let mut c = client(&socket);
|
||||
|
||||
let status = c.call("sync.status", json!({})).unwrap();
|
||||
// Standalone test daemon: no hub, self-update off, but the runtime block
|
||||
// always reports the version (mode is unset when not built via main()).
|
||||
assert_eq!(status["hub_url"], Value::Null);
|
||||
assert_eq!(
|
||||
status["runtime"]["version"].as_str(),
|
||||
Some(heph_core::VERSION)
|
||||
);
|
||||
assert_eq!(status["runtime"]["self_update"], Value::Null);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue