diff --git a/CHANGELOG.md b/CHANGELOG.md index 9799e3f..aa29354 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). +## [v1.4.0] - 2026-06-08 + +### Features + +- Spoke auth failures now tell you how to recover. When a refresh token is rejected or the hub returns 401, `hephd` records the real cause plus the exact `heph auth login --hub-url … --issuer … --client-id …` command (keyed to this spoke's hub) in its sync health. A new `heph auth status` prints that health and the re-login command, `heph sync --status`'s `last_error` carries it, and `heph-tui`'s status line points at it with a `⚠ auth · heph auth status` chip. +- `heph daemon start`/`restart` can now bake the daemon's full runtime config into the managed service — `--mode`, `--hub-url`, `--http-addr`, `--oidc-issuer`/`--oidc-audience`/`--oidc-client-id`, and `--self-update-interval-secs` (previously only the bare `--self-update` bool was wired). Regenerating preserves whatever is already baked into the on-disk plist/unit, so a bare `start`/`restart` no longer silently drops spoke/hub or self-update config. +- heph-tui's sync indicator now shows the last-sync age in seconds under a minute (`⟳ 26s`) instead of a flat `just now`, so the chip reads as a live heartbeat and a missed sync (the loop runs every 30s) shows up as the age climbing. + +### Bug Fixes + +- hephd no longer reports a rejected OAuth refresh as "identity provider unreachable". A reachable IdP that returns an HTTP error (e.g. `400 invalid_grant` once a refresh token expires/rotates) is now surfaced as a *rejection* — `identity provider rejected the request: HTTP 400 (invalid_grant): …` — with the OAuth error body, distinct from a genuine transport failure. This stops the wording from misdirecting incident response toward the network when the real fix is re-authentication. +- `heph daemon restart` on macOS no longer intermittently fails with `launchctl bootstrap failed: 5: Input/output error`. The old code bootstrapped immediately after `bootout`, racing launchd's asynchronous teardown; it now waits for the service to fully unload and retries the bootstrap. When the plist is unchanged (e.g. a plain binary upgrade) it uses `launchctl kickstart -k` to restart the loaded job atomically, sidestepping the bootout→bootstrap dance entirely. + + ## [v1.2.3] - 2026-06-06 ### Features diff --git a/Cargo.lock b/Cargo.lock index ade48f7..be8f974 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2196,7 +2196,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "heph" -version = "1.2.3" +version = "0.0.0" dependencies = [ "anyhow", "chrono", @@ -2210,7 +2210,7 @@ dependencies = [ [[package]] name = "heph-core" -version = "1.2.3" +version = "0.0.0" dependencies = [ "chrono", "proptest", @@ -2227,7 +2227,7 @@ dependencies = [ [[package]] name = "heph-quickadd" -version = "1.2.3" +version = "0.0.0" dependencies = [ "anyhow", "chrono", @@ -2243,7 +2243,7 @@ dependencies = [ [[package]] name = "heph-tui" -version = "1.2.3" +version = "0.0.0" dependencies = [ "anyhow", "chrono", @@ -2259,7 +2259,7 @@ dependencies = [ [[package]] name = "hephd" -version = "1.2.3" +version = "0.0.0" dependencies = [ "anyhow", "apple-native-keyring-store", diff --git a/Cargo.toml b/Cargo.toml index f265603..e24c881 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ members = [ [workspace.package] edition = "2021" -version = "1.2.3" +version = "0.0.0" license = "LicenseRef-Proprietary" publish = false authors = ["Erich Blume "] diff --git a/crates/heph-tui/src/fmt.rs b/crates/heph-tui/src/fmt.rs index 2383a7e..8c49ac9 100644 --- a/crates/heph-tui/src/fmt.rs +++ b/crates/heph-tui/src/fmt.rs @@ -30,13 +30,15 @@ pub fn now_ms() -> i64 { Local::now().timestamp_millis() } -/// A compact "how long ago" for the sync indicator: `just now` under a minute, -/// then `Nm` / `Nh` / `Nd`. Clamped at zero so a little clock skew never shows a -/// negative age. +/// A compact "how long ago" for the sync indicator: `Ns` under a minute, then +/// `Nm` / `Nh` / `Nd`. Second-granularity under a minute makes the chip a visible +/// heartbeat (the sync loop runs every 30s) and surfaces a missed beat as the age +/// climbing, rather than hiding under a flat "just now". Clamped at zero so a +/// little clock skew never shows a negative age. pub fn fmt_age(now_ms: i64, then_ms: i64) -> String { let secs = (now_ms - then_ms).max(0) / 1000; if secs < 60 { - "just now".into() + format!("{secs}s") } else if secs < 3_600 { format!("{}m", secs / 60) } else if secs < 86_400 { @@ -126,13 +128,14 @@ mod tests { #[test] fn age_is_compact_and_clamped() { let now = 1_000_000_000_000; - assert_eq!(fmt_age(now, now), "just now"); - assert_eq!(fmt_age(now, now - 30_000), "just now"); + assert_eq!(fmt_age(now, now), "0s"); + assert_eq!(fmt_age(now, now - 30_000), "30s"); + assert_eq!(fmt_age(now, now - 59_000), "59s"); assert_eq!(fmt_age(now, now - 5 * 60_000), "5m"); assert_eq!(fmt_age(now, now - 3 * 3_600_000), "3h"); assert_eq!(fmt_age(now, now - 2 * 86_400_000), "2d"); // Clock skew (then in the future) never shows a negative age. - assert_eq!(fmt_age(now, now + 10_000), "just now"); + assert_eq!(fmt_age(now, now + 10_000), "0s"); } #[test] diff --git a/crates/heph-tui/src/ui.rs b/crates/heph-tui/src/ui.rs index b457818..f6d2f37 100644 --- a/crates/heph-tui/src/ui.rs +++ b/crates/heph-tui/src/ui.rs @@ -570,7 +570,9 @@ fn sync_indicator(sync: &SyncStatus, now: i64) -> Vec> { let health = sync.health.clone().unwrap_or_default(); let mut spans = vec![if health.auth_failure { - Span::styled("⚠ auth", red) + // Point at the recovery command — `heph auth status` prints the exact + // `heph auth login …` to run (the full command is too long for the bar). + Span::styled("⚠ auth · heph auth status", red) } else if let Some(ts) = health.last_success_ms { Span::styled(format!("⟳ {}", fmt_age(now, ts)), dim) } else if health.last_error.is_some() { @@ -639,7 +641,7 @@ mod tests { }, 0, ); - assert_eq!(render(&auth, NOW), "⚠ auth"); + assert_eq!(render(&auth, NOW), "⚠ auth · heph auth status"); // Errored with no prior success → offline. let offline = spoke( @@ -661,10 +663,7 @@ mod tests { last_success_ms: Some(NOW), ..Default::default() }; - assert_eq!( - render(&spoke(h.clone(), 1), NOW), - "⟳ just now ⚠ 1 conflict" - ); - assert_eq!(render(&spoke(h, 3), NOW), "⟳ just now ⚠ 3 conflicts"); + assert_eq!(render(&spoke(h.clone(), 1), NOW), "⟳ 0s ⚠ 1 conflict"); + assert_eq!(render(&spoke(h, 3), NOW), "⟳ 0s ⚠ 3 conflicts"); } } diff --git a/crates/heph/src/main.rs b/crates/heph/src/main.rs index c327f1d..28d3b5e 100644 --- a/crates/heph/src/main.rs +++ b/crates/heph/src/main.rs @@ -344,7 +344,7 @@ enum ConflictAction { }, } -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Debug, Clone)] enum AuthAction { /// Log in via the device-code flow; caches the bearer token for hub sync. Login { @@ -367,6 +367,9 @@ enum AuthAction { #[arg(long)] hub_url: String, }, + /// Show this spoke's auth health and, if re-auth is needed, the exact + /// `heph auth login` command to run. Queries the daemon. + Status, } /// Run the device-code flow (or clear a token) — no daemon needed. @@ -396,10 +399,63 @@ fn run_auth(action: AuthAction) -> Result<()> { KeyringTokenStore::new(hub_url.as_str()).clear()?; println!("Logged out of {hub_url}."); } + AuthAction::Status => unreachable!("auth status is handled via the daemon"), } Ok(()) } +/// Render `heph auth status` from a `sync.status` RPC response: hub/issuer/client +/// id, whether auth is healthy or needs re-login, and — when it does — the exact +/// command to run (built daemon-side, keyed under the right hub URL). +fn print_auth_status(status: &Value) { + let Some(hub) = status.get("hub_url").and_then(Value::as_str) else { + println!("This instance is standalone (no hub configured); auth does not apply."); + return; + }; + let auth = status.get("auth"); + let issuer = auth.and_then(|a| a.get("issuer")).and_then(Value::as_str); + let client_id = auth + .and_then(|a| a.get("client_id")) + .and_then(Value::as_str); + let health = status.get("health"); + let auth_failure = health + .and_then(|h| h.get("auth_failure")) + .and_then(Value::as_bool) + .unwrap_or(false); + let last_error = health + .and_then(|h| h.get("last_error")) + .and_then(Value::as_str); + let last_success = health + .and_then(|h| h.get("last_success_ms")) + .and_then(Value::as_i64); + + println!("hub : {hub}"); + if let Some(iss) = issuer { + println!("issuer : {iss}"); + } + if let Some(cid) = client_id { + println!("client id : {cid}"); + } + println!( + "auth : {}", + if auth_failure { + "FAILED — re-authentication required" + } else if last_success.is_some() { + "ok" + } else { + "unknown (no successful sync yet)" + } + ); + if let Some(err) = last_error { + println!("last error : {err}"); + } + if auth_failure { + if let Some(cmd) = status.get("reauth_command").and_then(Value::as_str) { + println!("\nTo re-authenticate, run:\n {cmd}"); + } + } +} + fn main() -> Result<()> { let cli = Cli::parse(); @@ -407,9 +463,13 @@ fn main() -> Result<()> { if let Command::Daemon { action } = &cli.command { return service::run(action); } - // `auth` runs locally (device-code flow + keyring); it needs no daemon. - if let Command::Auth { action } = cli.command { - return run_auth(action); + // `auth login`/`logout` run locally (device-code flow + keyring); they need + // no daemon. `auth status` reads live sync health, so it falls through to the + // connected path below. + if let Command::Auth { action } = &cli.command { + if !matches!(action, AuthAction::Status) { + return run_auth(action.clone()); + } } let socket = cli.socket.unwrap_or_else(default_socket_path); @@ -790,7 +850,13 @@ fn main() -> Result<()> { let n = result.as_u64().unwrap_or(0); println!("Rewrote legacy [[Name]] links to [[id]] in {n} node(s)."); } - Command::Auth { .. } => unreachable!("auth is handled before connecting"), + Command::Auth { + action: AuthAction::Status, + } => { + let result = client.call("sync.status", json!({}))?; + print_auth_status(&result); + } + Command::Auth { .. } => unreachable!("auth login/logout handled before connecting"), Command::Daemon { .. } => unreachable!("daemon is handled before connecting"), } Ok(()) diff --git a/crates/heph/src/service.rs b/crates/heph/src/service.rs index 7b15865..0b8928b 100644 --- a/crates/heph/src/service.rs +++ b/crates/heph/src/service.rs @@ -4,12 +4,19 @@ //! be shared by the CLI, TUI, and `heph.nvim` without any one of them owning its //! lifecycle. macOS uses a launchd **LaunchAgent**, Linux a **systemd user //! service**. All verbs are idempotent. +//! +//! The service generator bakes the daemon's runtime config — mode, sync hub, +//! and OIDC — into the unit so a spoke/hub can run under the managed service +//! instead of a hand-written plist/unit. Regenerating (`start`/`restart`) +//! **preserves any config already baked into the on-disk file**, so a bare +//! invocation never silently drops flags a previous one set. use std::path::{Path, PathBuf}; use std::process::Command; +use std::time::{Duration, Instant}; use anyhow::{bail, Context, Result}; -use clap::Subcommand; +use clap::{Args, Subcommand}; use hephd::{default_db_path, default_socket_path}; @@ -19,28 +26,106 @@ const LABEL: &str = "org.hephaestus.hephd"; #[derive(Subcommand, Debug)] pub enum DaemonAction { /// Install (if needed) and start the daemon service. - Start { - /// Generate a service that runs with opt-in self-update enabled - /// (default off). The service gets a PATH that can find cargo. - #[arg(long)] - self_update: bool, - }, + Start(ServiceArgs), /// Stop the daemon now (it may restart at next login; use `uninstall` to /// stop it for good). Stop, /// Restart the daemon — run this after upgrading the binary. Preserves the - /// existing self-update setting unless `--self-update` re-enables it. - Restart { - /// Force self-update on when regenerating the service definition. - #[arg(long)] - self_update: bool, - }, + /// config already baked into the service file (pass flags to add/override). + Restart(ServiceArgs), /// Show whether the service is installed and running. Status, /// Stop and remove the service entirely. Uninstall, } +/// Config flags baked into the generated service, shared by `start`/`restart`. +/// Anything omitted falls back to what the on-disk service file already has, so +/// regenerating is non-destructive. +#[derive(Args, Debug)] +pub struct ServiceArgs { + /// Runtime mode baked into the service (default `local`). Use `server` for a + /// sync hub, `client` for an online-only proxy. + #[arg(long, value_parser = ["local", "server", "client"])] + mode: Option, + /// Hub to background-sync this replica's op-log with (makes it a spoke) — + /// bakes `--hub-url`. + #[arg(long)] + hub_url: Option, + /// Hub HTTP listen address (server mode) — bakes `--http-addr`. + #[arg(long)] + http_addr: Option, + /// OIDC issuer used to verify (server) or obtain (spoke) hub tokens — bakes + /// `--oidc-issuer`. + #[arg(long)] + oidc_issuer: Option, + /// OIDC audience hub tokens must carry (server mode) — bakes + /// `--oidc-audience`. + #[arg(long)] + oidc_audience: Option, + /// OIDC client id this device authenticates as (spoke) — bakes + /// `--oidc-client-id`. + #[arg(long)] + oidc_client_id: Option, + /// Generate a service that runs with opt-in self-update enabled (default + /// off). The service gets a PATH that can find cargo. + #[arg(long)] + self_update: bool, + /// Override the self-update poll interval, in seconds (default: 6h). Only + /// meaningful with --self-update. + #[arg(long)] + self_update_interval_secs: Option, +} + +/// The hephd flags the service generator bakes beyond the fixed `--db`/`--socket`. +#[derive(Default, Clone, PartialEq, Debug)] +struct DaemonConfig { + mode: Option, + hub_url: Option, + http_addr: Option, + oidc_issuer: Option, + oidc_audience: Option, + oidc_client_id: Option, + self_update: bool, + self_update_interval_secs: Option, +} + +impl ServiceArgs { + fn to_config(&self) -> DaemonConfig { + DaemonConfig { + mode: self.mode.clone(), + hub_url: self.hub_url.clone(), + http_addr: self.http_addr.clone(), + oidc_issuer: self.oidc_issuer.clone(), + oidc_audience: self.oidc_audience.clone(), + oidc_client_id: self.oidc_client_id.clone(), + self_update: self.self_update, + self_update_interval_secs: self.self_update_interval_secs, + } + } +} + +impl DaemonConfig { + /// CLI-provided values win; anything omitted falls back to `base` (the flags + /// already baked into the on-disk service file), so regenerating the service + /// never drops config a previous invocation set. `self_update` is sticky — + /// it stays on if either the CLI or the existing file enabled it. + fn fill_from(self, base: DaemonConfig) -> DaemonConfig { + DaemonConfig { + mode: self.mode.or(base.mode), + hub_url: self.hub_url.or(base.hub_url), + http_addr: self.http_addr.or(base.http_addr), + oidc_issuer: self.oidc_issuer.or(base.oidc_issuer), + oidc_audience: self.oidc_audience.or(base.oidc_audience), + oidc_client_id: self.oidc_client_id.or(base.oidc_client_id), + self_update: self.self_update || base.self_update, + self_update_interval_secs: self + .self_update_interval_secs + .or(base.self_update_interval_secs), + } + } +} + /// Resolved locations the service definition needs. struct Paths { hephd: PathBuf, @@ -114,6 +199,105 @@ pub fn run(action: &DaemonAction) -> Result<()> { } } +// -------------------------------------------------------------------------- +// hephd argument vector (pure — shared by both renderers and unit-tested) +// -------------------------------------------------------------------------- + +/// The full `hephd …` argument vector the service runs, given the resolved paths +/// and baked config. `--mode` defaults to `local`; the optional flags appear in +/// a stable order so regenerating an unchanged config produces an identical file. +fn hephd_args(hephd: &Path, db: &Path, socket: &Path, cfg: &DaemonConfig) -> Vec { + let mut a = vec![ + hephd.to_string_lossy().into_owned(), + "--mode".into(), + cfg.mode.clone().unwrap_or_else(|| "local".into()), + "--db".into(), + db.to_string_lossy().into_owned(), + "--socket".into(), + socket.to_string_lossy().into_owned(), + ]; + push_opt(&mut a, "--hub-url", &cfg.hub_url); + push_opt(&mut a, "--http-addr", &cfg.http_addr); + push_opt(&mut a, "--oidc-issuer", &cfg.oidc_issuer); + push_opt(&mut a, "--oidc-audience", &cfg.oidc_audience); + push_opt(&mut a, "--oidc-client-id", &cfg.oidc_client_id); + // Interval is only meaningful with --self-update, so it's nested under it. + if cfg.self_update { + a.push("--self-update".into()); + if let Some(secs) = cfg.self_update_interval_secs { + a.push("--self-update-interval-secs".into()); + a.push(secs.to_string()); + } + } + a +} + +fn push_opt(args: &mut Vec, flag: &str, val: &Option) { + if let Some(v) = val { + args.push(flag.to_string()); + args.push(v.clone()); + } +} + +/// Parse a `hephd` argument vector back into a [`DaemonConfig`] — the inverse of +/// [`hephd_args`], used to recover config already baked into an on-disk service +/// file. Unrecognized args (the binary path, `--db`, `--socket`) are ignored. +fn parse_hephd_args(args: &[String]) -> DaemonConfig { + let mut c = DaemonConfig::default(); + let mut i = 0; + while i < args.len() { + let next = || args.get(i + 1).cloned(); + match args[i].as_str() { + "--mode" => { + c.mode = next(); + i += 2; + } + "--hub-url" => { + c.hub_url = next(); + i += 2; + } + "--http-addr" => { + c.http_addr = next(); + i += 2; + } + "--oidc-issuer" => { + c.oidc_issuer = next(); + i += 2; + } + "--oidc-audience" => { + c.oidc_audience = next(); + i += 2; + } + "--oidc-client-id" => { + c.oidc_client_id = next(); + i += 2; + } + "--self-update" => { + c.self_update = true; + i += 1; + } + "--self-update-interval-secs" => { + c.self_update_interval_secs = next().and_then(|s| s.parse().ok()); + i += 2; + } + _ => i += 1, + } + } + c +} + +/// Recover the config baked into an existing service file (empty if absent). +fn existing_config(path: &Path, mgr: &Manager) -> DaemonConfig { + let Ok(s) = std::fs::read_to_string(path) else { + return DaemonConfig::default(); + }; + let args = match mgr { + Manager::Launchd => launchd_program_args(&s), + Manager::Systemd => systemd_exec_args(&s), + }; + parse_hephd_args(&args) +} + // -------------------------------------------------------------------------- // Rendering (pure — unit-tested) // -------------------------------------------------------------------------- @@ -124,17 +308,22 @@ fn xml_escape(s: &str) -> String { .replace('>', ">") } -fn launchd_plist(hephd: &Path, db: &Path, socket: &Path, log: &Path, self_update: bool) -> String { - let arg = |p: &Path| xml_escape(&p.to_string_lossy()); - // Opt-in self-update: pass the flag, and give the service a PATH/HOME that - // can find cargo + the toolchain (a LaunchAgent's default env can't), since - // the apply path shells out to `cargo install`. - let self_update_arg = if self_update { - "\n --self-update".to_string() - } else { - String::new() - }; - let cargo_env = if self_update { +fn xml_unescape(s: &str) -> String { + s.replace("<", "<") + .replace(">", ">") + .replace("&", "&") +} + +fn launchd_plist(hephd: &Path, db: &Path, socket: &Path, log: &Path, cfg: &DaemonConfig) -> String { + let args_xml = hephd_args(hephd, db, socket, cfg) + .iter() + .map(|a| format!(" {}", xml_escape(a))) + .collect::>() + .join("\n"); + // Opt-in self-update needs a PATH/HOME that can find cargo + the toolchain + // (a LaunchAgent's default env can't), since the apply path shells out to + // `cargo install`. + let cargo_env = if cfg.self_update { let (path, home) = cargo_env(); format!( "\n PATH\n {}\n HOME\n {}", @@ -153,13 +342,7 @@ fn launchd_plist(hephd: &Path, db: &Path, socket: &Path, log: &Path, self_update {label} ProgramArguments - {hephd} - --mode - local - --db - {db} - --socket - {socket}{self_update_arg} +{args_xml} RunAtLoad @@ -181,10 +364,7 @@ fn launchd_plist(hephd: &Path, db: &Path, socket: &Path, log: &Path, self_update "#, label = LABEL, - hephd = arg(hephd), - db = arg(db), - socket = arg(socket), - log = arg(log), + log = xml_escape(&log.to_string_lossy()), ) } @@ -199,20 +379,34 @@ fn cargo_env() -> (String, String) { (path, home) } -/// Whether an already-installed service file opted into self-update — so -/// `restart` (which regenerates the file) preserves the setting instead of -/// silently turning it off. -fn file_opts_into_self_update(path: &Path) -> bool { - std::fs::read_to_string(path) - .map(|s| s.contains("--self-update")) - .unwrap_or(false) +/// Extract the `ProgramArguments` strings from an existing launchd plist. +fn launchd_program_args(plist: &str) -> Vec { + let Some(k) = plist.find("ProgramArguments") else { + return vec![]; + }; + let rest = &plist[k..]; + let (Some(start), Some(end)) = (rest.find(""), rest.find("")) else { + return vec![]; + }; + let block = &rest[start..end]; + let mut out = vec![]; + let mut cur = block; + while let Some(o) = cur.find("") { + let after = &cur[o + "".len()..]; + let Some(c) = after.find("") else { + break; + }; + out.push(xml_unescape(&after[..c])); + cur = &after[c + "".len()..]; + } + out } -fn systemd_unit(hephd: &Path, db: &Path, socket: &Path, self_update: bool) -> String { - // Opt-in self-update: pass the flag and give the unit a PATH/HOME that can - // find cargo + the toolchain, since the apply path runs `cargo install`. - let su_arg = if self_update { " --self-update" } else { "" }; - let cargo_env = if self_update { +fn systemd_unit(hephd: &Path, db: &Path, socket: &Path, cfg: &DaemonConfig) -> String { + let exec = hephd_args(hephd, db, socket, cfg).join(" "); + // Opt-in self-update needs a PATH/HOME that can find cargo + the toolchain, + // since the apply path runs `cargo install`. + let cargo_env = if cfg.self_update { let (path, home) = cargo_env(); format!("Environment=PATH={path}\nEnvironment=HOME={home}\n") } else { @@ -224,19 +418,24 @@ fn systemd_unit(hephd: &Path, db: &Path, socket: &Path, self_update: bool) -> St After=default.target\n\ \n\ [Service]\n\ - ExecStart={hephd} --mode local --db {db} --socket {socket}{su_arg}\n\ + ExecStart={exec}\n\ {cargo_env}\ Restart=always\n\ RestartSec=1\n\ \n\ [Install]\n\ WantedBy=default.target\n", - hephd = hephd.display(), - db = db.display(), - socket = socket.display(), ) } +/// Extract the `ExecStart=` argument vector from an existing systemd unit. +fn systemd_exec_args(unit: &str) -> Vec { + unit.lines() + .find_map(|l| l.strip_prefix("ExecStart=")) + .map(|rest| rest.split_whitespace().map(str::to_string).collect()) + .unwrap_or_default() +} + // -------------------------------------------------------------------------- // Shared helpers // -------------------------------------------------------------------------- @@ -296,6 +495,51 @@ fn launchd_loaded(domain_target: &str) -> bool { .unwrap_or(false) } +/// Block until `target` is no longer loaded, up to `timeout`. `launchctl bootout` +/// is asynchronous in effect — it requests teardown and returns, but launchd may +/// still be killing/reaping the job and removing its label from the domain. +/// Bootstrapping while the label lingers fails with a generic `5: Input/output +/// error`, so we wait for the label to actually disappear before re-bootstrapping. +fn wait_until_unloaded(target: &str, timeout: Duration) { + let start = Instant::now(); + while launchd_loaded(target) { + if start.elapsed() >= timeout { + break; // fall through; bootstrap's own retry covers the residual window + } + std::thread::sleep(Duration::from_millis(100)); + } +} + +/// Bootstrap the service, retrying briefly. Even once the old instance is gone, +/// launchd can momentarily return EIO while the domain settles, so a couple of +/// short retries make `start`/`restart` reliable instead of intermittently failing. +fn launchd_bootstrap(domain: &str, plist: &str) -> Result<()> { + let mut last = String::new(); + for attempt in 0..5 { + if attempt > 0 { + std::thread::sleep(Duration::from_millis(200)); + } + let (ok, err) = run_cmd("launchctl", &["bootstrap", domain, plist])?; + if ok { + return Ok(()); + } + last = err; + } + bail!("launchctl bootstrap failed: {}", last.trim()); +} + +/// Restart an already-loaded job in place (kills it, then launchd's KeepAlive — +/// `-k` forces the kill). This restarts the *loaded* job definition, so it does +/// not pick up an edited plist — callers use it only when the on-disk plist is +/// unchanged, where it sidesteps the bootout→bootstrap race entirely. +fn launchd_kickstart(target: &str) -> Result<()> { + let (ok, err) = run_cmd("launchctl", &["kickstart", "-k", target])?; + if !ok { + bail!("launchctl kickstart failed: {}", err.trim()); + } + Ok(()) +} + fn launchd(action: &DaemonAction, p: &Paths) -> Result<()> { let plist = launchd_plist_path()?; let uid = uid()?; @@ -303,18 +547,18 @@ fn launchd(action: &DaemonAction, p: &Paths) -> Result<()> { let target = format!("gui/{uid}/{LABEL}"); match action { - DaemonAction::Start { self_update } => { + DaemonAction::Start(args) => { + let cfg = args + .to_config() + .fill_from(existing_config(&plist, &Manager::Launchd)); write_if_changed( &plist, - &launchd_plist(&p.hephd, &p.db, &p.socket, &p.log, *self_update), + &launchd_plist(&p.hephd, &p.db, &p.socket, &p.log, &cfg), )?; if launchd_loaded(&target) { println!("heph daemon already running ({LABEL})."); } else { - let (ok, err) = run_cmd("launchctl", &["bootstrap", &domain, &plist_str(&plist)?])?; - if !ok { - bail!("launchctl bootstrap failed: {}", err.trim()); - } + launchd_bootstrap(&domain, &plist_str(&plist)?)?; println!("heph daemon started ({LABEL})."); } } @@ -322,16 +566,28 @@ fn launchd(action: &DaemonAction, p: &Paths) -> Result<()> { let (_ok, _err) = run_cmd("launchctl", &["bootout", &target])?; println!("heph daemon stopped (still installed; `uninstall` to remove)."); } - DaemonAction::Restart { self_update } => { - let su = *self_update || file_opts_into_self_update(&plist); - write_if_changed( + DaemonAction::Restart(args) => { + let cfg = args + .to_config() + .fill_from(existing_config(&plist, &Manager::Launchd)); + let changed = write_if_changed( &plist, - &launchd_plist(&p.hephd, &p.db, &p.socket, &p.log, su), + &launchd_plist(&p.hephd, &p.db, &p.socket, &p.log, &cfg), )?; - let _ = run_cmd("launchctl", &["bootout", &target])?; - let (ok, err) = run_cmd("launchctl", &["bootstrap", &domain, &plist_str(&plist)?])?; - if !ok { - bail!("launchctl bootstrap failed: {}", err.trim()); + if !launchd_loaded(&target) { + // Not currently loaded — nothing to tear down, just bring it up. + launchd_bootstrap(&domain, &plist_str(&plist)?)?; + } else if changed { + // The plist changed, so launchd must re-read it: a full reload is + // required. bootout is async, so wait for the label to clear + // before bootstrapping (and bootstrap retries the residual EIO). + let _ = run_cmd("launchctl", &["bootout", &target])?; + wait_until_unloaded(&target, Duration::from_secs(5)); + launchd_bootstrap(&domain, &plist_str(&plist)?)?; + } else { + // Same definition (e.g. binary upgraded in place) — restart the + // loaded job atomically, sidestepping the bootout→bootstrap race. + launchd_kickstart(&target)?; } println!("heph daemon restarted ({LABEL})."); } @@ -380,11 +636,11 @@ fn sc(args: &[&str]) -> Result<(bool, String)> { fn systemd(action: &DaemonAction, p: &Paths) -> Result<()> { let unit = systemd_unit_path()?; match action { - DaemonAction::Start { self_update } => { - write_if_changed( - &unit, - &systemd_unit(&p.hephd, &p.db, &p.socket, *self_update), - )?; + DaemonAction::Start(args) => { + let cfg = args + .to_config() + .fill_from(existing_config(&unit, &Manager::Systemd)); + write_if_changed(&unit, &systemd_unit(&p.hephd, &p.db, &p.socket, &cfg))?; sc(&["daemon-reload"])?; let (ok, err) = sc(&["enable", "--now", UNIT])?; if !ok { @@ -396,9 +652,11 @@ fn systemd(action: &DaemonAction, p: &Paths) -> Result<()> { sc(&["stop", UNIT])?; println!("heph daemon stopped (still enabled; `uninstall` to remove)."); } - DaemonAction::Restart { self_update } => { - let su = *self_update || file_opts_into_self_update(&unit); - write_if_changed(&unit, &systemd_unit(&p.hephd, &p.db, &p.socket, su))?; + DaemonAction::Restart(args) => { + let cfg = args + .to_config() + .fill_from(existing_config(&unit, &Manager::Systemd)); + write_if_changed(&unit, &systemd_unit(&p.hephd, &p.db, &p.socket, &cfg))?; sc(&["daemon-reload"])?; let (ok, err) = sc(&["restart", UNIT])?; if !ok { @@ -440,6 +698,18 @@ fn print_status(installed: bool, running: bool, p: &Paths, service_file: &Path) mod tests { use super::*; + fn spoke_cfg() -> DaemonConfig { + DaemonConfig { + mode: Some("local".into()), + hub_url: Some("http://hub.example:8787".into()), + oidc_issuer: Some("https://idp.example/o/heph/".into()), + oidc_client_id: Some("heph".into()), + self_update: true, + self_update_interval_secs: Some(600), + ..Default::default() + } + } + #[test] fn launchd_plist_has_label_args_and_paths() { let plist = launchd_plist( @@ -447,19 +717,21 @@ mod tests { Path::new("/home/e/.local/share/heph/heph.db"), Path::new("/tmp/heph/hephd.sock"), Path::new("/home/e/.local/share/heph/hephd.log"), - false, + &DaemonConfig::default(), ); assert!(plist.contains("org.hephaestus.hephd")); assert!(plist.contains("/usr/local/bin/hephd")); assert!(plist.contains("--mode")); + assert!(plist.contains("local")); assert!(plist.contains("/home/e/.local/share/heph/heph.db")); assert!(plist.contains("/tmp/heph/hephd.sock")); assert!(plist.contains("RunAtLoad")); assert!(plist.contains("KeepAlive")); assert!(plist.contains("hephd.log")); - // Default (no self-update): no flag, no cargo PATH baked in. + // Default (no self-update, no spoke/hub config): none of those flags. assert!(!plist.contains("--self-update")); assert!(!plist.contains(".cargo/bin")); + assert!(!plist.contains("--hub-url")); } #[test] @@ -469,12 +741,64 @@ mod tests { Path::new("/db"), Path::new("/sock"), Path::new("/log"), - true, + &DaemonConfig { + self_update: true, + ..Default::default() + }, ); assert!(plist.contains("--self-update")); assert!(plist.contains("PATH")); assert!(plist.contains(".cargo/bin")); assert!(plist.contains("HOME")); + // No interval given → no interval flag. + assert!(!plist.contains("--self-update-interval-secs")); + } + + #[test] + fn launchd_plist_self_update_interval_is_baked_under_self_update() { + let with = launchd_plist( + Path::new("/hephd"), + Path::new("/db"), + Path::new("/sock"), + Path::new("/log"), + &DaemonConfig { + self_update: true, + self_update_interval_secs: Some(3600), + ..Default::default() + }, + ); + assert!(with.contains("--self-update-interval-secs")); + assert!(with.contains("3600")); + // Interval is meaningless without --self-update, so it's not emitted. + let without = launchd_plist( + Path::new("/hephd"), + Path::new("/db"), + Path::new("/sock"), + Path::new("/log"), + &DaemonConfig { + self_update: false, + self_update_interval_secs: Some(3600), + ..Default::default() + }, + ); + assert!(!without.contains("--self-update-interval-secs")); + } + + #[test] + fn launchd_plist_bakes_spoke_config() { + let plist = launchd_plist( + Path::new("/hephd"), + Path::new("/db"), + Path::new("/sock"), + Path::new("/log"), + &spoke_cfg(), + ); + assert!(plist.contains("--hub-url")); + assert!(plist.contains("http://hub.example:8787")); + assert!(plist.contains("--oidc-issuer")); + assert!(plist.contains("https://idp.example/o/heph/")); + assert!(plist.contains("--oidc-client-id")); + assert!(plist.contains("heph")); } #[test] @@ -483,7 +807,7 @@ mod tests { Path::new("/usr/local/bin/hephd"), Path::new("/home/e/.local/share/heph/heph.db"), Path::new("/run/user/1000/heph/hephd.sock"), - false, + &DaemonConfig::default(), ); assert!(unit.contains( "ExecStart=/usr/local/bin/hephd --mode local \ @@ -507,17 +831,96 @@ mod tests { Path::new("/usr/local/bin/hephd"), Path::new("/db"), Path::new("/sock"), - true, + &DaemonConfig { + self_update: true, + self_update_interval_secs: Some(3600), + ..Default::default() + }, ); - assert!(unit.contains("--self-update")); + assert!(unit.contains("--self-update --self-update-interval-secs 3600")); assert!(unit.contains("Environment=PATH=")); assert!(unit.contains(".cargo/bin")); assert!(unit.contains("Environment=HOME=")); } #[test] - fn xml_escape_escapes_markup() { + fn systemd_unit_bakes_hub_config() { + let unit = systemd_unit( + Path::new("/hephd"), + Path::new("/db"), + Path::new("/sock"), + &DaemonConfig { + mode: Some("server".into()), + http_addr: Some("0.0.0.0:8787".into()), + oidc_issuer: Some("https://idp.example/o/heph/".into()), + oidc_audience: Some("heph".into()), + ..Default::default() + }, + ); + assert!(unit.contains("--mode server")); + assert!(unit.contains("--http-addr 0.0.0.0:8787")); + assert!(unit.contains("--oidc-issuer https://idp.example/o/heph/")); + assert!(unit.contains("--oidc-audience heph")); + } + + #[test] + fn launchd_config_round_trips_through_the_plist() { + let cfg = spoke_cfg(); + let plist = launchd_plist( + Path::new("/hephd"), + Path::new("/db"), + Path::new("/sock"), + Path::new("/log"), + &cfg, + ); + let parsed = parse_hephd_args(&launchd_program_args(&plist)); + assert_eq!(parsed, cfg); + } + + #[test] + fn systemd_config_round_trips_through_the_unit() { + let cfg = DaemonConfig { + mode: Some("server".into()), + http_addr: Some("0.0.0.0:8787".into()), + oidc_issuer: Some("https://idp.example/o/heph/".into()), + oidc_audience: Some("heph".into()), + self_update: true, + self_update_interval_secs: Some(600), + ..Default::default() + }; + let unit = systemd_unit( + Path::new("/hephd"), + Path::new("/db"), + Path::new("/sock"), + &cfg, + ); + let parsed = parse_hephd_args(&systemd_exec_args(&unit)); + assert_eq!(parsed, cfg); + } + + #[test] + fn fill_from_preserves_existing_and_lets_cli_override() { + let existing = spoke_cfg(); + // A bare invocation (no flags) preserves everything baked in the file. + assert_eq!( + DaemonConfig::default().fill_from(existing.clone()), + existing + ); + // A CLI-provided value overrides; self_update stays sticky. + let overridden = DaemonConfig { + self_update_interval_secs: Some(60), + ..Default::default() + } + .fill_from(existing.clone()); + assert_eq!(overridden.self_update_interval_secs, Some(60)); + assert_eq!(overridden.hub_url, existing.hub_url); + assert!(overridden.self_update); + } + + #[test] + fn xml_escape_round_trips() { assert_eq!(xml_escape("a & b < c > d"), "a & b < c > d"); + assert_eq!(xml_unescape("a & b < c > d"), "a & b < c > d"); } #[test] diff --git a/crates/hephd/src/auth.rs b/crates/hephd/src/auth.rs index c601d90..6b80e95 100644 --- a/crates/hephd/src/auth.rs +++ b/crates/hephd/src/auth.rs @@ -38,9 +38,45 @@ pub enum AuthError { /// The token was present but failed validation. #[error("invalid token: {0}")] Invalid(String), - /// The identity provider could not be reached to fetch keys. + /// The identity provider could not be reached at all (DNS, TLS, connection + /// refused, timeout) — a transport failure, distinct from a rejection. #[error("identity provider unreachable: {0}")] - Provider(String), + Unreachable(String), + /// The identity provider *was* reached but returned an HTTP error response — + /// e.g. `400 invalid_grant` on a refresh, meaning the token was rejected + /// (expired/rotated/session-invalidated), not that the IdP was down. The + /// distinction matters: "unreachable" sends debugging toward the network; + /// this points at the token/authorization. + #[error("identity provider rejected the request: {0}")] + Rejected(String), + /// Some other failure in the auth path that is neither a transport failure + /// nor an HTTP rejection — a malformed/unparseable IdP response, or a local + /// credential-store (keyring) error. Kept distinct so neither is mislabeled + /// as "unreachable". + #[error("auth error: {0}")] + Other(String), +} + +impl AuthError { + /// Build a [`AuthError::Rejected`] from an HTTP status and the OAuth error + /// body (RFC 6749 §5.2), e.g. `HTTP 400 (invalid_grant): Token is expired`. + pub fn rejected(status: u16, error: Option<&str>, description: Option<&str>) -> AuthError { + let mut msg = format!("HTTP {status}"); + if let Some(e) = error.filter(|e| !e.is_empty()) { + msg.push_str(&format!(" ({e})")); + } + if let Some(d) = description.filter(|d| !d.is_empty()) { + msg.push_str(&format!(": {d}")); + } + AuthError::Rejected(msg) + } + + /// Whether this is an authorization-level rejection (the IdP refused the + /// grant) rather than a transport failure — i.e. re-authentication is the + /// likely fix, not network troubleshooting. + pub fn is_rejection(&self) -> bool { + matches!(self, AuthError::Rejected(_)) + } } /// Verifies a bearer token and returns its [`Claims`]. A trait so the hub can be @@ -92,16 +128,13 @@ impl OidcVerifier { .http .get(url) .call() - .map_err(|e| AuthError::Provider(e.to_string()))?; + .map_err(|e| AuthError::Unreachable(e.to_string()))?; if !resp.status().is_success() { - return Err(AuthError::Provider(format!( - "{url} returned {}", - resp.status() - ))); + return Err(AuthError::rejected(resp.status().as_u16(), None, None)); } resp.body_mut() .read_json() - .map_err(|e| AuthError::Provider(e.to_string())) + .map_err(|e| AuthError::Unreachable(e.to_string())) } /// Resolve the JWKS URI from the provider's discovery document. @@ -169,3 +202,38 @@ impl TokenVerifier for OidcVerifier { Some((&self.issuer, &self.audience)) } } + +#[cfg(test)] +mod tests { + use super::AuthError; + + #[test] + fn rejected_formats_status_error_and_description() { + let e = AuthError::rejected(400, Some("invalid_grant"), Some("Token is not active")); + assert!(e.is_rejection()); + assert_eq!( + e.to_string(), + "identity provider rejected the request: HTTP 400 (invalid_grant): Token is not active" + ); + } + + #[test] + fn rejected_omits_absent_or_empty_oauth_fields() { + // No OAuth body (e.g. a bare 503) → just the status. + assert_eq!( + AuthError::rejected(503, None, None).to_string(), + "identity provider rejected the request: HTTP 503" + ); + // Empty strings are treated as absent, not rendered as "()" / ": ". + assert_eq!( + AuthError::rejected(400, Some(""), Some("")).to_string(), + "identity provider rejected the request: HTTP 400" + ); + } + + #[test] + fn unreachable_is_not_a_rejection() { + assert!(!AuthError::Unreachable("connection refused".into()).is_rejection()); + assert!(!AuthError::Other("keyring locked".into()).is_rejection()); + } +} diff --git a/crates/hephd/src/client.rs b/crates/hephd/src/client.rs index c3c008b..8a2bd5d 100644 --- a/crates/hephd/src/client.rs +++ b/crates/hephd/src/client.rs @@ -2,59 +2,145 @@ //! //! Used by the `heph` CLI and by tests. Surfaces never touch SQLite directly //! (tech-spec §3) — they go through the daemon socket, which this wraps. +//! +//! The connection self-heals across daemon restarts (opt-in self-update, `heph +//! daemon restart`): a [`call`](Client::call) that finds the socket dropped +//! reconnects. It only auto-retries when the request provably never reached the +//! daemon (a write-side failure); a reply lost *after* sending is surfaced +//! rather than retried, so a mutation is never silently double-applied. use std::io::{BufRead, BufReader, Write}; use std::os::unix::net::UnixStream; -use std::path::Path; +use std::path::{Path, PathBuf}; -use anyhow::{bail, Context, Result}; +use anyhow::{anyhow, Context, Result}; use serde_json::{json, Value}; use crate::rpc::Response; /// A connected client. One request/response per [`call`](Client::call). pub struct Client { + socket_path: PathBuf, reader: BufReader, writer: UnixStream, next_id: u64, } +/// How a single request/response exchange failed — drives the retry decision. +enum ExchangeError { + /// The request could not be written (broken pipe, reset): it never reached + /// the daemon, so retrying on a fresh connection is safe. + Send(anyhow::Error), + /// The request was sent but no reply came back (the daemon closed mid-flight, + /// e.g. it restarted): it may or may not have applied — do not retry. + Recv(anyhow::Error), + /// A well-formed RPC-level error (or an unparseable reply): the connection is + /// fine; nothing to reconnect. + Rpc(anyhow::Error), +} + +impl ExchangeError { + fn into_inner(self) -> anyhow::Error { + match self { + ExchangeError::Send(e) | ExchangeError::Recv(e) | ExchangeError::Rpc(e) => e, + } + } +} + impl Client { /// Connect to a daemon listening at `socket_path`. pub fn connect(socket_path: &Path) -> Result { - let stream = UnixStream::connect(socket_path) - .with_context(|| format!("connecting to hephd at {}", socket_path.display()))?; - let reader = BufReader::new(stream.try_clone()?); + let (reader, writer) = Self::open(socket_path)?; Ok(Client { + socket_path: socket_path.to_path_buf(), reader, - writer: stream, + writer, next_id: 1, }) } + /// Open a fresh reader/writer pair on the socket. + fn open(socket_path: &Path) -> Result<(BufReader, UnixStream)> { + let stream = UnixStream::connect(socket_path) + .with_context(|| format!("connecting to hephd at {}", socket_path.display()))?; + let reader = BufReader::new(stream.try_clone()?); + Ok((reader, stream)) + } + + /// Re-establish the connection (after the daemon restarted and dropped it). + fn reconnect(&mut self) -> Result<()> { + let (reader, writer) = Self::open(&self.socket_path)?; + self.reader = reader; + self.writer = writer; + Ok(()) + } + /// Call `method` with `params`, returning the `result` value (or an error /// carrying the RPC error's code and message). + /// + /// If the daemon has restarted and dropped the socket, this reconnects: it + /// retries transparently when the request never went out, and otherwise + /// reconnects for the next call while surfacing an error for this one (so a + /// mutation whose reply was lost is not silently re-applied). pub fn call(&mut self, method: &str, params: Value) -> Result { let id = self.next_id; self.next_id += 1; - let mut line = serde_json::to_string(&json!({ "id": id, "method": method, "params": params, }))?; line.push('\n'); - self.writer.write_all(line.as_bytes())?; - self.writer.flush()?; + + match self.exchange(&line) { + Ok(v) => Ok(v), + Err(ExchangeError::Rpc(e)) => Err(e), + Err(ExchangeError::Send(_)) => { + // The request never reached the daemon — reconnect and retry once. + self.reconnect() + .context("hephd connection lost and reconnect failed")?; + self.exchange(&line) + .map_err(ExchangeError::into_inner) + .with_context(|| format!("retrying `{method}` after reconnect")) + } + Err(ExchangeError::Recv(e)) => { + // Sent but no reply: the daemon likely restarted mid-request. Don't + // retry (a mutation may have applied); reconnect for next time and + // surface this one. + let _ = self.reconnect(); + Err(e).context( + "hephd closed the connection mid-request (it likely restarted); \ + reconnected — re-run the action if it didn't take effect", + ) + } + } + } + + /// One request/response over the current connection, classifying failures. + fn exchange(&mut self, line: &str) -> std::result::Result { + self.writer + .write_all(line.as_bytes()) + .map_err(|e| ExchangeError::Send(e.into()))?; + self.writer + .flush() + .map_err(|e| ExchangeError::Send(e.into()))?; let mut response_line = String::new(); - let read = self.reader.read_line(&mut response_line)?; + let read = self + .reader + .read_line(&mut response_line) + .map_err(|e| ExchangeError::Recv(e.into()))?; if read == 0 { - bail!("hephd closed the connection"); + return Err(ExchangeError::Recv(anyhow!("hephd closed the connection"))); } - let response: Response = serde_json::from_str(&response_line)?; + let response: Response = + serde_json::from_str(&response_line).map_err(|e| ExchangeError::Rpc(e.into()))?; if let Some(err) = response.error { - bail!("rpc error {}: {}", err.code, err.message); + return Err(ExchangeError::Rpc(anyhow!( + "rpc error {}: {}", + err.code, + err.message + ))); } Ok(response.result.unwrap_or(Value::Null)) } diff --git a/crates/hephd/src/oauth.rs b/crates/hephd/src/oauth.rs index 53ee5f0..4af704f 100644 --- a/crates/hephd/src/oauth.rs +++ b/crates/hephd/src/oauth.rs @@ -109,7 +109,7 @@ impl KeyringTokenStore { } }); keyring_core::Entry::new(&self.service, &self.account) - .map_err(|e| AuthError::Provider(e.to_string())) + .map_err(|e| AuthError::Other(e.to_string())) } } @@ -119,16 +119,16 @@ impl TokenStore for KeyringTokenStore { serde_json::from_str(&secret).ok() } fn save(&self, token: &StoredToken) -> Result<(), AuthError> { - let json = serde_json::to_string(token).map_err(|e| AuthError::Provider(e.to_string()))?; + let json = serde_json::to_string(token).map_err(|e| AuthError::Other(e.to_string()))?; self.entry()? .set_password(&json) - .map_err(|e| AuthError::Provider(e.to_string())) + .map_err(|e| AuthError::Other(e.to_string())) } fn clear(&self) -> Result<(), AuthError> { match self.entry()?.delete_credential() { Ok(()) => Ok(()), Err(keyring_core::Error::NoEntry) => Ok(()), - Err(e) => Err(AuthError::Provider(e.to_string())), + Err(e) => Err(AuthError::Other(e.to_string())), } } } @@ -187,6 +187,9 @@ impl TokenResponse { #[derive(Debug, Deserialize)] struct TokenErrorBody { error: String, + /// Human-readable detail the provider may include (RFC 6749 §5.2). + #[serde(default)] + error_description: Option, } /// Drives the OAuth 2.0 device-code flow against one provider. @@ -208,17 +211,14 @@ impl DeviceFlow { let mut resp = http .get(&url) .call() - .map_err(|e| AuthError::Provider(e.to_string()))?; + .map_err(|e| AuthError::Unreachable(e.to_string()))?; if !resp.status().is_success() { - return Err(AuthError::Provider(format!( - "discovery returned {}", - resp.status() - ))); + return Err(AuthError::rejected(resp.status().as_u16(), None, None)); } let doc: DiscoveryDoc = resp .body_mut() .read_json() - .map_err(|e| AuthError::Provider(e.to_string()))?; + .map_err(|e| AuthError::Other(e.to_string()))?; Ok(DeviceFlow { client_id: client_id.to_string(), http, @@ -233,16 +233,13 @@ impl DeviceFlow { .http .post(&self.device_authorization_endpoint) .send_form([("client_id", self.client_id.as_str()), ("scope", scope)]) - .map_err(|e| AuthError::Provider(e.to_string()))?; + .map_err(|e| AuthError::Unreachable(e.to_string()))?; if !resp.status().is_success() { - return Err(AuthError::Provider(format!( - "device authorization returned {}", - resp.status() - ))); + return Err(AuthError::rejected(resp.status().as_u16(), None, None)); } resp.body_mut() .read_json() - .map_err(|e| AuthError::Provider(e.to_string())) + .map_err(|e| AuthError::Other(e.to_string())) } /// Poll the token endpoint until the user authorizes, the code expires, or @@ -267,13 +264,13 @@ impl DeviceFlow { ("device_code", auth.device_code.as_str()), ("client_id", self.client_id.as_str()), ]) - .map_err(|e| AuthError::Provider(e.to_string()))?; + .map_err(|e| AuthError::Unreachable(e.to_string()))?; if response.status().is_success() { let token: TokenResponse = response .body_mut() .read_json() - .map_err(|e| AuthError::Provider(e.to_string()))?; + .map_err(|e| AuthError::Other(e.to_string()))?; return Ok(token.into_stored()); } @@ -281,7 +278,7 @@ impl DeviceFlow { let body: TokenErrorBody = response .body_mut() .read_json() - .map_err(|e| AuthError::Provider(e.to_string()))?; + .map_err(|e| AuthError::Other(e.to_string()))?; match body.error.as_str() { "authorization_pending" => {} "slow_down" => interval += 5, @@ -301,17 +298,24 @@ impl DeviceFlow { ("refresh_token", refresh_token), ("client_id", self.client_id.as_str()), ]) - .map_err(|e| AuthError::Provider(e.to_string()))?; + .map_err(|e| AuthError::Unreachable(e.to_string()))?; if !response.status().is_success() { - return Err(AuthError::Provider(format!( - "token refresh returned {}", - response.status() - ))); + // The IdP was reached and refused the grant (typically a `400 + // invalid_grant` once the refresh token is expired/rotated). Report + // it as a *rejection* with the OAuth error body — not "unreachable", + // which would misdirect debugging toward the network. + let status = response.status().as_u16(); + let body = response.body_mut().read_json::().ok(); + return Err(AuthError::rejected( + status, + body.as_ref().map(|b| b.error.as_str()), + body.as_ref().and_then(|b| b.error_description.as_deref()), + )); } let mut token: StoredToken = response .body_mut() .read_json::() - .map_err(|e| AuthError::Provider(e.to_string()))? + .map_err(|e| AuthError::Other(e.to_string()))? .into_stored(); // Providers may omit the refresh token on refresh — keep the old one. if token.refresh_token.is_none() { diff --git a/crates/hephd/src/server.rs b/crates/hephd/src/server.rs index 30c5d5a..89dee78 100644 --- a/crates/hephd/src/server.rs +++ b/crates/hephd/src/server.rs @@ -20,6 +20,7 @@ use tokio::net::{UnixListener, UnixStream}; use heph_core::Store; +use crate::auth::AuthError; use crate::oauth::{self, TokenStore}; use crate::rpc::{self, Request, Response, RpcError, INTERNAL_ERROR, PARSE_ERROR}; use crate::selfupdate::{self, SelfUpdateConfig}; @@ -80,10 +81,25 @@ fn is_auth_error(e: &anyhow::Error) -> bool { .is_some_and(|s| s == reqwest::StatusCode::UNAUTHORIZED) } -/// Fold one exchange outcome into the shared [`SyncHealth`]. -fn record_sync_outcome(health: &Arc>, result: &Result) { +/// The exact `heph auth login …` command that re-authenticates this spoke, built +/// from the hub URL + issuer + client id the daemon is configured with — so the +/// surfaced error tells the user *what to run*, not just that auth failed. +/// `None` for an unauthenticated / standalone instance. The hub-URL string must +/// match what the credential store is keyed under, which is exactly `hub_url`. +fn reauth_command(hub_url: Option<&str>, auth: Option<&SpokeAuth>) -> Option { + let (hub, auth) = (hub_url?, auth?); + Some(format!( + "heph auth login --hub-url {hub} --issuer {} --client-id {}", + auth.issuer, auth.client_id + )) +} + +/// Fold one exchange outcome into the shared [`SyncHealth`]. On an auth failure +/// (a 401 from the hub) the recorded error carries the actionable re-login +/// command, so `heph sync --status` / `heph auth status` / the TUI show the fix. +fn record_sync_outcome(ctx: &Ctx, result: &Result) { let now = now_ms(); - let mut h = health.lock().expect("sync_health mutex poisoned"); + let mut h = ctx.sync_health.lock().expect("sync_health mutex poisoned"); h.last_attempt_ms = Some(now); match result { Ok(_) => { @@ -92,28 +108,67 @@ fn record_sync_outcome(health: &Arc>, result: &Result { - h.auth_failure = is_auth_error(e); - h.last_error = Some(e.to_string()); + let auth_failure = is_auth_error(e); + h.auth_failure = auth_failure; + h.last_error = Some(annotate_reauth( + e.to_string(), + auth_failure, + ctx.hub_url.as_deref(), + ctx.auth.as_ref(), + )); } } } +/// Record a failure to obtain a bearer token (the refresh step, before any hub +/// request). A *rejection* (the IdP refused the refresh) is an auth failure and +/// gets the re-login hint; a transport failure stays a transient error. Surfacing +/// this here means `last_error` reflects the real cause (e.g. `invalid_grant`) +/// instead of only the downstream 401 on `/sync/pull`. +fn record_bearer_failure(ctx: &Ctx, err: &AuthError) { + let now = now_ms(); + let auth_failure = err.is_rejection(); + let mut h = ctx.sync_health.lock().expect("sync_health mutex poisoned"); + h.last_attempt_ms = Some(now); + h.auth_failure = auth_failure; + h.last_error = Some(annotate_reauth( + format!("could not obtain bearer token: {err}"), + auth_failure, + ctx.hub_url.as_deref(), + ctx.auth.as_ref(), + )); +} + +/// Append the actionable re-login command to `msg` when this is an auth failure +/// and the spoke has auth configured. +fn annotate_reauth( + msg: String, + auth_failure: bool, + hub_url: Option<&str>, + auth: Option<&SpokeAuth>, +) -> String { + match reauth_command(hub_url, auth) { + Some(cmd) if auth_failure => format!("{msg} — re-authenticate: {cmd}"), + _ => msg, + } +} + impl Ctx { - /// The current bearer token for hub sync (refreshing if expired), or `None` - /// if this spoke has no auth configured / no usable token. - async fn bearer(&self) -> Option { - let auth = self.auth.clone()?; - let result = tokio::task::spawn_blocking(move || { + /// The current bearer token for hub sync (refreshing if expired). `Ok(None)` + /// means this spoke has no auth configured / no token stored (it syncs + /// unauthenticated); `Err` means token acquisition genuinely failed (the + /// caller records it and skips the attempt rather than 401ing the hub). + async fn bearer(&self) -> Result, AuthError> { + let Some(auth) = self.auth.clone() else { + return Ok(None); + }; + match tokio::task::spawn_blocking(move || { oauth::current_bearer(auth.store.as_ref(), &auth.issuer, &auth.client_id) }) - .await; - match result { - Ok(Ok(token)) => token, - Ok(Err(e)) => { - tracing::warn!("could not obtain bearer token: {e}"); - None - } - Err(_) => None, + .await + { + Ok(res) => res, + Err(_join) => Ok(None), // the blocking task panicked; treat as no token } } } @@ -223,10 +278,20 @@ impl Daemon { let mut tick = tokio::time::interval(interval); loop { tick.tick().await; - let bearer = ctx.bearer().await; + let bearer = match ctx.bearer().await { + Ok(b) => b, + Err(e) => { + // Couldn't get a token — record the real cause (e.g. a + // rejected refresh) and skip; sending an unauthenticated + // request would only 401 and mask it. + record_bearer_failure(&ctx, &e); + tracing::warn!("background sync: could not obtain bearer token: {e}"); + continue; + } + }; let result = sync::sync_once(ctx.store.clone(), &hub, &ctx.http, bearer.as_deref()).await; - record_sync_outcome(&ctx.sync_health, &result); + record_sync_outcome(&ctx, &result); match result { Ok(report) => tracing::debug!(?report, "background sync"), Err(e) => tracing::warn!("background sync failed: {e}"), @@ -321,9 +386,25 @@ async fn sync_now(ctx: &Ctx) -> Result { message: "no hub_url configured; this instance is standalone".into(), }); }; - let bearer = ctx.bearer().await; + let bearer = match ctx.bearer().await { + Ok(b) => b, + Err(e) => { + // Token acquisition failed — record the real cause (with a re-login + // hint when it's a rejection) and surface it instead of a downstream 401. + record_bearer_failure(ctx, &e); + return Err(RpcError { + code: INTERNAL_ERROR, + message: annotate_reauth( + format!("sync failed: could not obtain bearer token: {e}"), + e.is_rejection(), + ctx.hub_url.as_deref(), + ctx.auth.as_ref(), + ), + }); + } + }; let result = sync::sync_once(ctx.store.clone(), &hub_url, &ctx.http, bearer.as_deref()).await; - record_sync_outcome(&ctx.sync_health, &result); + record_sync_outcome(ctx, &result); match result { Ok(report) => Ok(json!(report)), Err(e) => Err(RpcError { @@ -374,10 +455,22 @@ async fn sync_status(ctx: &Ctx) -> Result { .expect("sync_health mutex poisoned") .clone(); + // Non-secret OIDC params (issuer/client-id) + the exact re-login command, so + // `heph auth status` can show the fix without reconstructing it client-side + // (and keyed under the right hub URL — see the per-URL token-keying gotcha). + let auth = ctx.auth.as_ref().map(|a| { + json!({ + "issuer": a.issuer, + "client_id": a.client_id, + }) + }); + Ok(json!({ "hub_url": hub_url, "cursors": cursors, "conflicts": conflicts, "health": health, + "auth": auth, + "reauth_command": reauth_command(Some(&hub_url), ctx.auth.as_ref()), })) } diff --git a/crates/hephd/src/sync.rs b/crates/hephd/src/sync.rs index bfaa323..9beac05 100644 --- a/crates/hephd/src/sync.rs +++ b/crates/hephd/src/sync.rs @@ -261,8 +261,14 @@ async fn require_auth( .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? .map_err(|e| match e { - AuthError::Provider(_) => StatusCode::SERVICE_UNAVAILABLE, - _ => StatusCode::UNAUTHORIZED, + // The token itself is missing/bad → tell the client it's unauthorized. + AuthError::Missing | AuthError::Invalid(_) => StatusCode::UNAUTHORIZED, + // We couldn't reach/process the IdP to fetch verification keys — a + // transient hub-side problem, not the client's token. Ask them to + // retry rather than claiming their token is invalid. + AuthError::Unreachable(_) | AuthError::Rejected(_) | AuthError::Other(_) => { + StatusCode::SERVICE_UNAVAILABLE + } })?; // Multi-tenancy seam: resolve the token's identity to the owner it may act diff --git a/crates/hephd/tests/client_reconnect.rs b/crates/hephd/tests/client_reconnect.rs new file mode 100644 index 0000000..a4d0074 --- /dev/null +++ b/crates/hephd/tests/client_reconnect.rs @@ -0,0 +1,96 @@ +//! [`Client`] survives the daemon dropping the socket (opt-in self-update, `heph +//! daemon restart`). A mock daemon serves exactly one request per connection +//! then closes it, forcing the client to reconnect — without auto-reconnect, +//! every call after the first would fail forever. + +use std::io::{BufRead, BufReader, Write}; +use std::os::unix::net::UnixListener; +use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +use hephd::Client; +use serde_json::{json, Value}; + +/// A mock daemon that handles ONE request per connection then closes it, looping +/// to accept the next connection. `served` counts total requests answered. +fn spawn_one_shot_daemon(socket: PathBuf, served: Arc) { + thread::spawn(move || { + let listener = UnixListener::bind(&socket).unwrap(); + for conn in listener.incoming() { + let Ok(mut stream) = conn else { continue }; + let mut reader = BufReader::new(stream.try_clone().unwrap()); + let mut line = String::new(); + if reader.read_line(&mut line).unwrap_or(0) == 0 { + continue; // client opened then went away; wait for the next one + } + let req: Value = serde_json::from_str(&line).unwrap(); + let n = served.fetch_add(1, Ordering::SeqCst) + 1; + let mut out = serde_json::to_string(&json!({ + "id": req["id"], + "result": { "served": n }, + })) + .unwrap(); + out.push('\n'); + let _ = stream.write_all(out.as_bytes()); + let _ = stream.flush(); + // `stream` drops here → the connection closes after one request. + } + }); +} + +fn wait_for(socket: &std::path::Path) { + for _ in 0..400 { + if socket.exists() { + return; + } + thread::sleep(Duration::from_millis(5)); + } + panic!("mock daemon socket never appeared"); +} + +#[test] +fn client_reconnects_after_the_daemon_drops_the_socket() { + let dir = tempfile::tempdir().unwrap(); + let socket = dir.path().join("d.sock"); + let served = Arc::new(AtomicUsize::new(0)); + spawn_one_shot_daemon(socket.clone(), served.clone()); + wait_for(&socket); + + let mut c = Client::connect(&socket).unwrap(); + + // First call works on the initial connection. + let r1 = c.call("ping", json!({})).unwrap(); + assert_eq!(r1["served"], 1); + + // The daemon has now closed that connection. With reconnect, the client + // recovers within a call or two (depending on whether the dead socket fails + // on write or on read); without it, every further call would fail forever. + let mut recovered = None; + for _ in 0..2 { + if let Ok(v) = c.call("ping", json!({})) { + recovered = Some(v); + break; + } + } + let r = recovered.expect("client should reconnect after the socket was dropped"); + // The recovered call was served exactly once on the new connection — no + // double-serve from a spurious retry. + assert_eq!(r["served"], 2); + assert_eq!(served.load(Ordering::SeqCst), 2); + + // And it keeps working across subsequent drops. + let r3 = { + let mut got = None; + for _ in 0..2 { + if let Ok(v) = c.call("ping", json!({})) { + got = Some(v); + break; + } + } + got.expect("client should keep reconnecting") + }; + assert_eq!(r3["served"], 3); +} diff --git a/crates/hephd/tests/oauth.rs b/crates/hephd/tests/oauth.rs index f61c872..0a1c709 100644 --- a/crates/hephd/tests/oauth.rs +++ b/crates/hephd/tests/oauth.rs @@ -90,11 +90,25 @@ async fn token(State(s): State, Form(form): Form Json(json!({ - "access_token": "access-2", - "expires_in": 3600, - })) - .into_response(), + Some("refresh_token") => { + // A rotated/expired refresh token is refused with `400 invalid_grant` + // (RFC 6749 §5.2) — the case that used to be mislabeled "unreachable". + if form.get("refresh_token").map(String::as_str) == Some("refresh-expired") { + return ( + StatusCode::BAD_REQUEST, + Json(json!({ + "error": "invalid_grant", + "error_description": "Token is not active", + })), + ) + .into_response(); + } + Json(json!({ + "access_token": "access-2", + "expires_in": 3600, + })) + .into_response() + } _ => ( StatusCode::BAD_REQUEST, Json(json!({ "error": "unsupported_grant_type" })), @@ -129,6 +143,48 @@ fn refresh_keeps_the_old_refresh_token_when_omitted() { assert_eq!(refreshed.refresh_token.as_deref(), Some("refresh-1")); } +#[test] +fn refresh_rejected_by_idp_is_a_rejection_not_unreachable() { + let issuer = start_idp(); + let flow = DeviceFlow::discover(&issuer, "heph-cli").unwrap(); + let err = flow.refresh("refresh-expired").unwrap_err(); + // The whole point of the fix: a reachable IdP that returns 400 is a + // *rejection*, carrying the OAuth error body — not "unreachable". + assert!(err.is_rejection(), "expected a rejection, got: {err}"); + let msg = err.to_string(); + assert!( + msg.contains("rejected"), + "message should say rejected: {msg}" + ); + assert!( + msg.contains("invalid_grant"), + "should include the OAuth error: {msg}" + ); + assert!( + msg.contains("Token is not active"), + "should include error_description: {msg}" + ); + assert!( + !msg.contains("unreachable"), + "must NOT claim the IdP was unreachable: {msg}" + ); +} + +#[test] +fn discovery_against_a_dead_idp_is_unreachable_not_a_rejection() { + use hephd::AuthError; + // Port 1 refuses the connection → a genuine transport failure. + let err = match DeviceFlow::discover("http://127.0.0.1:1/application/o/heph/", "heph-cli") { + Ok(_) => panic!("discovery should fail against a dead IdP"), + Err(e) => e, + }; + assert!( + matches!(err, AuthError::Unreachable(_)), + "a connection failure must be Unreachable, got: {err}" + ); + assert!(!err.is_rejection()); +} + #[test] fn memory_token_store_round_trips_and_reports_expiry() { let store = MemoryTokenStore::default(); diff --git a/docs/changelog.d/client-reconnect.bugfix.md b/docs/changelog.d/client-reconnect.bugfix.md new file mode 100644 index 0000000..ae987b8 --- /dev/null +++ b/docs/changelog.d/client-reconnect.bugfix.md @@ -0,0 +1 @@ +The `heph` CLI and `heph-tui` now survive a daemon restart. Previously the unix-socket client connected once and never reconnected, so an opt-in self-update or `heph daemon restart` left every subsequent call failing — `heph-tui` would sit on errors until relaunched. The client now reconnects on a dropped socket: a request that never went out is retried transparently, while a reply lost mid-request is surfaced (not silently retried) so a mutation is never double-applied. A long-running TUI self-heals on its next refresh tick. diff --git a/docs/how-to/run-the-daemon.md b/docs/how-to/run-the-daemon.md index 2b00dff..545b3be 100644 --- a/docs/how-to/run-the-daemon.md +++ b/docs/how-to/run-the-daemon.md @@ -36,14 +36,47 @@ when it's already stopped is fine. > exits cleanly to hand off to the new binary) wouldn't come back on Linux. Run > `heph daemon restart` once (it regenerates the unit) to pick up `Restart=always`. -Either way it runs `hephd --mode local` against the default store +By default it runs `hephd --mode local` against the default store (`~/.local/share/heph/heph.db`) and socket, with logs at -`~/.local/share/heph/hephd.log`. +`~/.local/share/heph/hephd.log`. Pass flags to `start`/`restart` to bake a +different runtime config into the service (see below). > **`stop` vs `uninstall`:** `stop` halts the daemon now, but the service is > still installed, so on macOS it starts again at next login. Use `uninstall` > to stop it persistently. +## Baking sync config (spoke / hub) + +By default the service runs a standalone `--mode local` daemon. To make the +managed service a **spoke** (background-syncs to a hub) or a **hub** (`--mode +server`), pass the corresponding `hephd` flags to `start` (or `restart`) — they +get baked into the generated plist/unit: + +```bash +# Spoke: sync to a hub, authenticating with OIDC +heph daemon start \ + --hub-url http://hub.example:8787 \ + --oidc-issuer https://idp.example/application/o/heph/ \ + --oidc-client-id heph + +# Hub: expose the authenticated sync endpoint +heph daemon start --mode server \ + --http-addr 0.0.0.0:8787 \ + --oidc-issuer https://idp.example/application/o/heph/ \ + --oidc-audience heph +``` + +Bakeable flags: `--mode`, `--hub-url`, `--http-addr`, `--oidc-issuer`, +`--oidc-audience`, `--oidc-client-id`, `--self-update`, +`--self-update-interval-secs`. **Regenerating preserves what's already baked +in** — `start`/`restart` read the existing service file and carry over any flags +you don't pass, so a bare `heph daemon restart` never drops your spoke/hub or +self-update config. Pass a flag again to add or override it. + +> Spoke sync is HTTP-only today (`hephd`'s sync client doesn't speak HTTPS) — a +> `--hub-url` over the tailnet or behind a TLS-terminating proxy is the usual +> setup. + ## After upgrading When you rebuild/reinstall (`cargo install … --force`), the running daemon is @@ -53,15 +86,25 @@ still the old binary until you restart it: heph daemon restart ``` +A restart (or an opt-in self-update) drops the daemon's unix socket out from +under any connected surface. The CLI and `heph-tui` **reconnect automatically**: +a read transparently retries on a fresh connection, and a long-running TUI +self-heals on its next tick — so a daemon restart no longer leaves the agenda +view stuck on errors. (A mutating action whose reply is lost mid-restart reports +"reconnected — re-run the action if it didn't take effect" rather than risk +applying twice.) + ## Self-update (opt-in) `hephd` can keep itself current: `heph daemon start --self-update` generates a service that polls the forge for newer releases and, when one appears, rebuilds via `cargo install` (anonymous HTTPS clone of the public repo — no credentials) and restarts onto the new binary. It is **off by default**; the generated -service also gets a `PATH` that can find cargo. `heph daemon restart` preserves -the setting (pass `--self-update` again to turn it on later). Requires the Rust -toolchain (`cargo`) installed for the service user. +service also gets a `PATH` that can find cargo. Override the 6h poll cadence with +`--self-update-interval-secs `. Both `start` and `restart` preserve an +already-baked self-update setting (and its interval), so a bare invocation won't +silently disable it — pass `--self-update` again only to turn it on later. +Requires the Rust toolchain (`cargo`) installed for the service user. ## Development isolation diff --git a/docs/how-to/self-update.md b/docs/how-to/self-update.md index d4dda1f..e7d0627 100644 --- a/docs/how-to/self-update.md +++ b/docs/how-to/self-update.md @@ -20,9 +20,17 @@ heph daemon start --self-update ``` That generates a launchd/systemd service that runs `hephd --self-update` and -gives it a `PATH` that can find `cargo`. `heph daemon restart` preserves the -setting (pass `--self-update` again to turn it on later). To run the daemon -directly instead: +gives it a `PATH` that can find `cargo`. Override the 6h poll cadence with +`--self-update-interval-secs `: + +```bash +heph daemon start --self-update # default: poll every 6h +heph daemon start --self-update --self-update-interval-secs 3600 +``` + +Both `start` and `restart` preserve an already-baked setting (the flag and its +interval), so a bare invocation won't silently disable it — pass `--self-update` +again only to turn it on later. To run the daemon directly instead: ```bash hephd --self-update # default: poll every 6h diff --git a/docs/how-to/set-up-sync-hub.md b/docs/how-to/set-up-sync-hub.md index a5b56ea..4d654a9 100644 --- a/docs/how-to/set-up-sync-hub.md +++ b/docs/how-to/set-up-sync-hub.md @@ -130,19 +130,41 @@ spoke is visible at a glance rather than buried in the daemon log. Make a change on `gilbert`, force a sync, and confirm it appears via the hub. +### When sync stops authenticating + +A spoke's refresh token can expire or be rotated (e.g. the IdP session lapses). +The spoke then can't refresh on its own and needs a re-login — but this is +**visible, not silent**: + +- `heph-tui` shows a red `⚠ auth · heph auth status` chip in the status line. +- `heph auth status` prints the auth health and the **exact** re-login command, + pre-filled with this spoke's hub URL / issuer / client id: + + ```bash + heph auth status + ``` + +- `heph sync --status`'s `last_error` names the real cause — a refresh + *rejection* (e.g. `HTTP 400 (invalid_grant)`), not a misleading "identity + provider unreachable" — and carries the same `heph auth login …` hint. + +Run the printed `heph auth login …` command to restore sync. + ## Current gaps (finalized by the blumeops deployment) -The flag-level flow above works today; two enablers make it a clean, managed +The flag-level flow above works today; one enabler makes it a clean, managed deployment rather than a hand-run process — tracked in the `Hephaestus` project: -- **`heph daemon` only generates a `--mode local` service** (no `--hub-url` / - `--oidc-*`). So for now the hub and the spoke config are expressed as `hephd` - flags (run directly, or via the blumeops-managed systemd unit), not via - `heph daemon start`. - **Path A seeding is manual** (copy the store + reset the device origin). A small enabler — seed a hub from a snapshot with a fresh origin, or `hephd --owner-id` — would make this one step. +> `heph daemon start`/`restart` can now bake the spoke/hub config (`--hub-url`, +> `--mode server`, `--http-addr`, `--oidc-*`) into the generated service (see +> [[run-the-daemon]]). The canonical hub on `indri` is still provisioned via the +> blumeops-managed systemd unit by deployment choice, not because `heph daemon` +> can't express it. + ## Related - [[run-the-daemon]] — manage the local daemon as an OS service