Reconnect the socket client across daemon restarts (heph-tui survives self-update) #15

Merged
eblume merged 1 commit from feature/client-reconnect into main 2026-06-08 15:22:15 -07:00
4 changed files with 204 additions and 13 deletions

View file

@ -2,59 +2,145 @@
//!
//! Used by the `heph` CLI and by tests. Surfaces never touch SQLite directly
//! (tech-spec §3) — they go through the daemon socket, which this wraps.
//!
//! The connection self-heals across daemon restarts (opt-in self-update, `heph
//! daemon restart`): a [`call`](Client::call) that finds the socket dropped
//! reconnects. It only auto-retries when the request provably never reached the
//! daemon (a write-side failure); a reply lost *after* sending is surfaced
//! rather than retried, so a mutation is never silently double-applied.
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::path::Path;
use std::path::{Path, PathBuf};
use anyhow::{bail, Context, Result};
use anyhow::{anyhow, Context, Result};
use serde_json::{json, Value};
use crate::rpc::Response;
/// A connected client. One request/response per [`call`](Client::call).
pub struct Client {
socket_path: PathBuf,
reader: BufReader<UnixStream>,
writer: UnixStream,
next_id: u64,
}
/// How a single request/response exchange failed — drives the retry decision.
enum ExchangeError {
/// The request could not be written (broken pipe, reset): it never reached
/// the daemon, so retrying on a fresh connection is safe.
Send(anyhow::Error),
/// The request was sent but no reply came back (the daemon closed mid-flight,
/// e.g. it restarted): it may or may not have applied — do not retry.
Recv(anyhow::Error),
/// A well-formed RPC-level error (or an unparseable reply): the connection is
/// fine; nothing to reconnect.
Rpc(anyhow::Error),
}
impl ExchangeError {
fn into_inner(self) -> anyhow::Error {
match self {
ExchangeError::Send(e) | ExchangeError::Recv(e) | ExchangeError::Rpc(e) => e,
}
}
}
impl Client {
/// Connect to a daemon listening at `socket_path`.
pub fn connect(socket_path: &Path) -> Result<Client> {
let stream = UnixStream::connect(socket_path)
.with_context(|| format!("connecting to hephd at {}", socket_path.display()))?;
let reader = BufReader::new(stream.try_clone()?);
let (reader, writer) = Self::open(socket_path)?;
Ok(Client {
socket_path: socket_path.to_path_buf(),
reader,
writer: stream,
writer,
next_id: 1,
})
}
/// Open a fresh reader/writer pair on the socket.
fn open(socket_path: &Path) -> Result<(BufReader<UnixStream>, UnixStream)> {
let stream = UnixStream::connect(socket_path)
.with_context(|| format!("connecting to hephd at {}", socket_path.display()))?;
let reader = BufReader::new(stream.try_clone()?);
Ok((reader, stream))
}
/// Re-establish the connection (after the daemon restarted and dropped it).
fn reconnect(&mut self) -> Result<()> {
let (reader, writer) = Self::open(&self.socket_path)?;
self.reader = reader;
self.writer = writer;
Ok(())
}
/// Call `method` with `params`, returning the `result` value (or an error
/// carrying the RPC error's code and message).
///
/// If the daemon has restarted and dropped the socket, this reconnects: it
/// retries transparently when the request never went out, and otherwise
/// reconnects for the next call while surfacing an error for this one (so a
/// mutation whose reply was lost is not silently re-applied).
pub fn call(&mut self, method: &str, params: Value) -> Result<Value> {
let id = self.next_id;
self.next_id += 1;
let mut line = serde_json::to_string(&json!({
"id": id,
"method": method,
"params": params,
}))?;
line.push('\n');
self.writer.write_all(line.as_bytes())?;
self.writer.flush()?;
match self.exchange(&line) {
Ok(v) => Ok(v),
Err(ExchangeError::Rpc(e)) => Err(e),
Err(ExchangeError::Send(_)) => {
// The request never reached the daemon — reconnect and retry once.
self.reconnect()
.context("hephd connection lost and reconnect failed")?;
self.exchange(&line)
.map_err(ExchangeError::into_inner)
.with_context(|| format!("retrying `{method}` after reconnect"))
}
Err(ExchangeError::Recv(e)) => {
// Sent but no reply: the daemon likely restarted mid-request. Don't
// retry (a mutation may have applied); reconnect for next time and
// surface this one.
let _ = self.reconnect();
Err(e).context(
"hephd closed the connection mid-request (it likely restarted); \
reconnected re-run the action if it didn't take effect",
)
}
}
}
/// One request/response over the current connection, classifying failures.
fn exchange(&mut self, line: &str) -> std::result::Result<Value, ExchangeError> {
self.writer
.write_all(line.as_bytes())
.map_err(|e| ExchangeError::Send(e.into()))?;
self.writer
.flush()
.map_err(|e| ExchangeError::Send(e.into()))?;
let mut response_line = String::new();
let read = self.reader.read_line(&mut response_line)?;
let read = self
.reader
.read_line(&mut response_line)
.map_err(|e| ExchangeError::Recv(e.into()))?;
if read == 0 {
bail!("hephd closed the connection");
return Err(ExchangeError::Recv(anyhow!("hephd closed the connection")));
}
let response: Response = serde_json::from_str(&response_line)?;
let response: Response =
serde_json::from_str(&response_line).map_err(|e| ExchangeError::Rpc(e.into()))?;
if let Some(err) = response.error {
bail!("rpc error {}: {}", err.code, err.message);
return Err(ExchangeError::Rpc(anyhow!(
"rpc error {}: {}",
err.code,
err.message
)));
}
Ok(response.result.unwrap_or(Value::Null))
}

View file

@ -0,0 +1,96 @@
//! [`Client`] survives the daemon dropping the socket (opt-in self-update, `heph
//! daemon restart`). A mock daemon serves exactly one request per connection
//! then closes it, forcing the client to reconnect — without auto-reconnect,
//! every call after the first would fail forever.
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixListener;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use hephd::Client;
use serde_json::{json, Value};
/// A mock daemon that handles ONE request per connection then closes it, looping
/// to accept the next connection. `served` counts total requests answered.
fn spawn_one_shot_daemon(socket: PathBuf, served: Arc<AtomicUsize>) {
thread::spawn(move || {
let listener = UnixListener::bind(&socket).unwrap();
for conn in listener.incoming() {
let Ok(mut stream) = conn else { continue };
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut line = String::new();
if reader.read_line(&mut line).unwrap_or(0) == 0 {
continue; // client opened then went away; wait for the next one
}
let req: Value = serde_json::from_str(&line).unwrap();
let n = served.fetch_add(1, Ordering::SeqCst) + 1;
let mut out = serde_json::to_string(&json!({
"id": req["id"],
"result": { "served": n },
}))
.unwrap();
out.push('\n');
let _ = stream.write_all(out.as_bytes());
let _ = stream.flush();
// `stream` drops here → the connection closes after one request.
}
});
}
fn wait_for(socket: &std::path::Path) {
for _ in 0..400 {
if socket.exists() {
return;
}
thread::sleep(Duration::from_millis(5));
}
panic!("mock daemon socket never appeared");
}
#[test]
fn client_reconnects_after_the_daemon_drops_the_socket() {
let dir = tempfile::tempdir().unwrap();
let socket = dir.path().join("d.sock");
let served = Arc::new(AtomicUsize::new(0));
spawn_one_shot_daemon(socket.clone(), served.clone());
wait_for(&socket);
let mut c = Client::connect(&socket).unwrap();
// First call works on the initial connection.
let r1 = c.call("ping", json!({})).unwrap();
assert_eq!(r1["served"], 1);
// The daemon has now closed that connection. With reconnect, the client
// recovers within a call or two (depending on whether the dead socket fails
// on write or on read); without it, every further call would fail forever.
let mut recovered = None;
for _ in 0..2 {
if let Ok(v) = c.call("ping", json!({})) {
recovered = Some(v);
break;
}
}
let r = recovered.expect("client should reconnect after the socket was dropped");
// The recovered call was served exactly once on the new connection — no
// double-serve from a spurious retry.
assert_eq!(r["served"], 2);
assert_eq!(served.load(Ordering::SeqCst), 2);
// And it keeps working across subsequent drops.
let r3 = {
let mut got = None;
for _ in 0..2 {
if let Ok(v) = c.call("ping", json!({})) {
got = Some(v);
break;
}
}
got.expect("client should keep reconnecting")
};
assert_eq!(r3["served"], 3);
}

View file

@ -0,0 +1 @@
The `heph` CLI and `heph-tui` now survive a daemon restart. Previously the unix-socket client connected once and never reconnected, so an opt-in self-update or `heph daemon restart` left every subsequent call failing — `heph-tui` would sit on errors until relaunched. The client now reconnects on a dropped socket: a request that never went out is retried transparently, while a reply lost mid-request is surfaced (not silently retried) so a mutation is never double-applied. A long-running TUI self-heals on its next refresh tick.

View file

@ -86,6 +86,14 @@ still the old binary until you restart it:
heph daemon restart
```
A restart (or an opt-in self-update) drops the daemon's unix socket out from
under any connected surface. The CLI and `heph-tui` **reconnect automatically**:
a read transparently retries on a fresh connection, and a long-running TUI
self-heals on its next tick — so a daemon restart no longer leaves the agenda
view stuck on errors. (A mutating action whose reply is lost mid-restart reports
"reconnected — re-run the action if it didn't take effect" rather than risk
applying twice.)
## Self-update (opt-in)
`hephd` can keep itself current: `heph daemon start --self-update` generates a