From 5d54e913c23bee92871cf5938bdb8223e9844ae1 Mon Sep 17 00:00:00 2001 From: Erich Blume Date: Mon, 1 Jun 2026 15:29:28 -0700 Subject: [PATCH] hephd: client mode + RemoteStore (sync 9b) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the online-only escape hatch — a no-replica daemon that proxies every Store call to a server over HTTP (tech-spec §3.1). - Daemon is now generic over the backing store (Arc>), so the same unix-socket surface fronts either a LocalStore (local/server) or a RemoteStore (client). sync::router/sync_once and the Ctx follow suit. - New POST /rpc route on the hub router runs the full rpc::dispatch over HTTP (result-xor-error body, always 200). dispatch gains task.get and links.add so the proxied API is complete. - RemoteStore (hephd): implements heph_core::Store by forwarding each call to /rpc via a blocking reqwest client (Store is sync; the daemon only calls it from the blocking pool). Error::Remote for transport failures; NOT_FOUND is preserved as Error::NodeNotFound. Sync primitives are stubbed (a client keeps no op-log). - main: --mode client + --server-url; client skips the file lock and opens no LocalStore. - tests/client_mode.rs: a RemoteStore drives node/task/search/list/health against a real HTTP server, and not-found maps back correctly. 102 tests green; clippy -D warnings + fmt + prek clean. Next: OIDC auth. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 18 ++ Cargo.toml | 1 + README.md | 10 +- crates/heph-core/src/error.rs | 5 + crates/hephd/src/lib.rs | 2 + crates/hephd/src/main.rs | 111 +++++++----- crates/hephd/src/remote.rs | 217 +++++++++++++++++++++++ crates/hephd/src/rpc.rs | 17 +- crates/hephd/src/server.rs | 14 +- crates/hephd/src/sync.rs | 51 +++++- crates/hephd/tests/client_mode.rs | 97 ++++++++++ docs/changelog.d/v1-prototype.feature.md | 1 + docs/reference/tech-spec.md | 10 +- 13 files changed, 486 insertions(+), 68 deletions(-) create mode 100644 crates/hephd/src/remote.rs create mode 100644 crates/hephd/tests/client_mode.rs diff --git a/Cargo.lock b/Cargo.lock index 296ab67..c901551 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -455,6 +455,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -463,6 +464,18 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + [[package]] name = "futures-task" version = "0.3.32" @@ -476,7 +489,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ "futures-core", + "futures-io", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "slab", ] @@ -1199,7 +1215,9 @@ checksum = "219c5811de6525e5416c7d5d53bb656d3afdbc6c5af816e0802bcfa42dbdc1c3" dependencies = [ "base64", "bytes", + "futures-channel", "futures-core", + "futures-util", "http", "http-body", "http-body-util", diff --git a/Cargo.toml b/Cargo.toml index a2a291d..c9a922c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ axum = "0.8" reqwest = { version = "0.13", default-features = false, features = [ "json", "query", + "blocking", ] } [profile.release] diff --git a/README.md b/README.md index f706e25..f8ba65f 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ See **[docs/explanation/design.md](docs/explanation/design.md)** for the vision ## Status -**Phase 1 (v1 prototype) — in progress** on branch `feature/v1-prototype`. The **local system is feature-complete and replicas now sync through a hub over HTTP** — the offline-first everyday config (`local` + `hub_url`) converges end-to-end, with a `yrs` text-CRDT merging bodies. Remaining: the online-only `client` mode, auth, and the Neovim plugin. Built test-first (100 tests at last update). The canonical tracker is **tech-spec §14**. +**Phase 1 (v1 prototype) — in progress** on branch `feature/v1-prototype`. **All three runtime modes are implemented and replicas sync through a hub over HTTP** — the offline-first everyday config (`local` + `hub_url`) converges end-to-end with a `yrs` text-CRDT merging bodies, and `client` mode proxies to a server with no local replica. Remaining: auth and the Neovim plugin. Built test-first (102 tests at last update). The canonical tracker is **tech-spec §14**. | Area | State | |---|---| @@ -21,8 +21,8 @@ See **[docs/explanation/design.md](docs/explanation/design.md)** for the vision | Sync engine — HLC, op-log, converging merge + conflict queue (no network yet) | ✅ done | | yrs text-CRDT for body merge | ✅ done | | `server` (hub) mode + spoke push/pull sync over HTTP (axum) | ✅ done | -| `client` mode + `RemoteStore` (online-only, no replica) | ⏳ next | -| OIDC/Authentik auth + per-user isolation | ⏳ | +| `client` mode + `RemoteStore` (online-only, no replica) | ✅ done | +| OIDC/Authentik auth + per-user isolation | ⏳ next | | `heph.nvim` (primary surface) | ⏳ | ## Architecture @@ -30,7 +30,7 @@ See **[docs/explanation/design.md](docs/explanation/design.md)** for the vision A Cargo workspace, layered so the same core runs from a laptop to a hub: - **`crates/heph-core`** — the library: data model, the `Store` trait + SQLite store, markdown parsing/extraction, recurrence, the "what is next?" engine, and the sync engine (op-log, hybrid logical clocks, CRDT/LWW merge, conflict detection). Synchronous and clock-injected (no ambient wall-clock reads) so ranking and merge are deterministic. -- **`crates/hephd`** — the per-device daemon. One binary, three modes — **`local`** (own SQLite replica; a syncing spoke when given `--hub-url`), **`server`** (also the sync hub: an HTTP endpoint others sync against), **`client`** *(planned)* (thin, remote, no replica) — selected by configuration via a targetable `Store` backend. Surfaces connect to it over a unix socket; it owns the DB handle and background sync. +- **`crates/hephd`** — the per-device daemon. One binary, three modes — **`local`** (own SQLite replica; a syncing spoke when given `--hub-url`), **`server`** (also the sync hub: an HTTP endpoint others sync against), **`client`** (thin, remote, no replica — proxies to a `--server-url`) — selected by configuration via a targetable `Store` backend. Surfaces connect to it over a unix socket; it owns the DB handle and background sync. - **`crates/heph`** — the CLI: a thin client of the daemon (no direct DB access). - **`heph.nvim/`** *(planned)* — the Neovim plugin, the primary editing/agenda surface. @@ -83,7 +83,7 @@ mise run ai-docs # docs AI agents read firs ``` ./Cargo.toml # workspace manifest ./crates/heph-core/ # core library: model, store, extraction, recurrence, ranking, sync -./crates/hephd/ # daemon: local + server (hub) modes — unix-socket RPC + HTTP sync; client planned +./crates/hephd/ # daemon: local/server/client modes — unix-socket RPC + HTTP sync/rpc ./crates/heph/ # CLI: thin client of the daemon ./heph.nvim/ # Neovim plugin (planned) ./docs/ # Diataxis docs (design, tech-spec, how-to), Quartz config diff --git a/crates/heph-core/src/error.rs b/crates/heph-core/src/error.rs index 79cf597..d0c3eb5 100644 --- a/crates/heph-core/src/error.rs +++ b/crates/heph-core/src/error.rs @@ -22,6 +22,11 @@ pub enum Error { /// A value in the database did not match the expected shape. #[error("data integrity: {0}")] Integrity(String), + + /// A remote backend (a `RemoteStore` in `client` mode) failed or returned an + /// error response (tech-spec §3.1). + #[error("remote: {0}")] + Remote(String), } /// Convenience result alias. diff --git a/crates/hephd/src/lib.rs b/crates/hephd/src/lib.rs index 85ebbfe..c665ecd 100644 --- a/crates/hephd/src/lib.rs +++ b/crates/hephd/src/lib.rs @@ -10,6 +10,7 @@ pub mod client; pub mod clock; pub mod lock; +pub mod remote; pub mod rpc; pub mod server; pub mod sync; @@ -19,6 +20,7 @@ use std::path::PathBuf; pub use client::Client; pub use clock::SystemClock; pub use lock::LockGuard; +pub use remote::RemoteStore; pub use server::Daemon; pub use sync::{sync_once, SyncReport}; diff --git a/crates/hephd/src/main.rs b/crates/hephd/src/main.rs index de54dfb..781009a 100644 --- a/crates/hephd/src/main.rs +++ b/crates/hephd/src/main.rs @@ -14,7 +14,9 @@ use clap::{Parser, ValueEnum}; use tokio::net::{TcpListener, UnixListener}; use heph_core::LocalStore; -use hephd::{default_db_path, default_socket_path, sync, Daemon, LockGuard, SystemClock}; +use hephd::{ + default_db_path, default_socket_path, sync, Daemon, LockGuard, RemoteStore, SystemClock, +}; /// How often a spoke background-syncs with its hub. const SYNC_INTERVAL: Duration = Duration::from_secs(30); @@ -27,6 +29,8 @@ enum Mode { Local, /// Also a sync hub: exposes the authenticated network endpoint over HTTP. Server, + /// No local replica; proxy every call to a `--server-url` (online-only). + Client, } /// The Hephaestus per-device daemon. @@ -52,6 +56,10 @@ struct Cli { /// Address for the hub HTTP endpoint (server mode only). #[arg(long)] http_addr: Option, + + /// Server to proxy to (client mode only; required there). + #[arg(long)] + server_url: Option, } #[tokio::main] @@ -64,56 +72,71 @@ async fn main() -> Result<()> { .init(); let cli = Cli::parse(); - let db = cli.db.unwrap_or_else(default_db_path); - let socket = cli.socket.unwrap_or_else(default_socket_path); - - if let Some(parent) = db.parent() { - std::fs::create_dir_all(parent) - .with_context(|| format!("creating store dir {}", parent.display()))?; - } + let socket = cli.socket.clone().unwrap_or_else(default_socket_path); if let Some(parent) = socket.parent() { std::fs::create_dir_all(parent) .with_context(|| format!("creating socket dir {}", parent.display()))?; } - // Take the exclusive lock before opening the store (tech-spec §3.1). - let _lock = LockGuard::acquire(&db)?; - let store = LocalStore::open(&db, Box::new(SystemClock))?; - let daemon = Daemon::new(store).with_hub(cli.hub_url.clone()); - - // server mode: expose the hub HTTP endpoint over the same store. - if cli.mode == Mode::Server { - let addr = cli - .http_addr - .clone() - .unwrap_or_else(|| DEFAULT_HTTP_ADDR.to_string()); - let app = sync::router(daemon.store()); - let listener = TcpListener::bind(&addr) - .await - .with_context(|| format!("binding hub HTTP endpoint {addr}"))?; - tracing::info!(%addr, "hub HTTP endpoint listening"); - tokio::spawn(async move { - if let Err(e) = axum::serve(listener, app).await { - tracing::error!("hub HTTP endpoint stopped: {e}"); + // Build the daemon for the chosen mode. `local`/`server` own the file (and + // hold its lock for the process's life); `client` keeps no replica. + let (_lock, daemon) = match cli.mode { + Mode::Client => { + let server_url = cli + .server_url + .clone() + .context("client mode requires --server-url")?; + tracing::info!(%server_url, "client mode: proxying to server (no local replica)"); + (None, Daemon::new(RemoteStore::new(&server_url))) + } + Mode::Local | Mode::Server => { + let db = cli.db.clone().unwrap_or_else(default_db_path); + if let Some(parent) = db.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("creating store dir {}", parent.display()))?; } - }); - } + // Take the exclusive lock before opening the store (tech-spec §3.1). + let lock = LockGuard::acquire(&db)?; + let store = LocalStore::open(&db, Box::new(SystemClock))?; + let daemon = Daemon::new(store).with_hub(cli.hub_url.clone()); - // spoke: background-sync the op-log with the configured hub. - if let Some(hub) = cli.hub_url.clone() { - let store = daemon.store(); - tokio::spawn(async move { - let http = reqwest::Client::new(); - let mut tick = tokio::time::interval(SYNC_INTERVAL); - loop { - tick.tick().await; - match hephd::sync_once(store.clone(), &hub, &http).await { - Ok(report) => tracing::debug!(?report, "background sync"), - Err(e) => tracing::warn!("background sync failed: {e}"), - } + // server mode: expose the hub HTTP endpoint over the same store. + if cli.mode == Mode::Server { + let addr = cli + .http_addr + .clone() + .unwrap_or_else(|| DEFAULT_HTTP_ADDR.to_string()); + let app = sync::router(daemon.store()); + let http_listener = TcpListener::bind(&addr) + .await + .with_context(|| format!("binding hub HTTP endpoint {addr}"))?; + tracing::info!(%addr, "hub HTTP endpoint listening"); + tokio::spawn(async move { + if let Err(e) = axum::serve(http_listener, app).await { + tracing::error!("hub HTTP endpoint stopped: {e}"); + } + }); } - }); - } + + // spoke: background-sync the op-log with the configured hub. + if let Some(hub) = cli.hub_url.clone() { + let store = daemon.store(); + tokio::spawn(async move { + let http = reqwest::Client::new(); + let mut tick = tokio::time::interval(SYNC_INTERVAL); + loop { + tick.tick().await; + match hephd::sync_once(store.clone(), &hub, &http).await { + Ok(report) => tracing::debug!(?report, "background sync"), + Err(e) => tracing::warn!("background sync failed: {e}"), + } + } + }); + } + + (Some(lock), daemon) + } + }; // Replace any stale socket from a previous run, then bind. if socket.exists() { @@ -123,6 +146,6 @@ async fn main() -> Result<()> { let listener = UnixListener::bind(&socket) .with_context(|| format!("binding socket {}", socket.display()))?; - tracing::info!(db = %db.display(), socket = %socket.display(), mode = ?cli.mode, "hephd listening"); + tracing::info!(socket = %socket.display(), mode = ?cli.mode, "hephd listening"); daemon.serve(listener).await } diff --git a/crates/hephd/src/remote.rs b/crates/hephd/src/remote.rs new file mode 100644 index 0000000..db5dfea --- /dev/null +++ b/crates/hephd/src/remote.rs @@ -0,0 +1,217 @@ +//! `RemoteStore` — the `client`-mode backend (tech-spec §3.1). +//! +//! A `client` keeps **no local replica**: it proxies every [`Store`] call to a +//! `server`'s `POST /rpc` endpoint (the same [`crate::rpc::dispatch`] the unix +//! socket runs). Since `Store` is synchronous, this uses a **blocking** reqwest +//! client; the daemon only ever calls store methods from its blocking pool, so +//! that is safe. +//! +//! With no op-log of its own, the sync primitives (`ops_since`/`apply_op`/ +//! `sync_state`/`record_sync`) are not meaningful here — a `client` never +//! background-syncs, it reads and writes the hub live. They are stubbed +//! accordingly; the daemon never invokes them in this mode. + +use serde::de::DeserializeOwned; +use serde_json::{json, Value}; + +use heph_core::{ + Attention, Conflict, Error, Health, Link, LinkType, NewNode, NewTask, Node, Result, Store, + SyncCursors, Task, TaskState, +}; + +use crate::rpc::{Response, NOT_FOUND}; + +/// A no-replica store that proxies to a `server` over HTTP. +pub struct RemoteStore { + base: String, + http: reqwest::blocking::Client, +} + +impl RemoteStore { + /// Point a client at `server_url` (e.g. `http://hub.example:8787`). + pub fn new(server_url: &str) -> RemoteStore { + RemoteStore { + base: server_url.trim_end_matches('/').to_string(), + http: reqwest::blocking::Client::new(), + } + } + + /// Issue one `/rpc` call, returning the raw `result` value. + fn call(&self, method: &str, params: Value) -> Result { + let response: Response = self + .http + .post(format!("{}/rpc", self.base)) + .json(&json!({ "method": method, "params": params })) + .send() + .and_then(reqwest::blocking::Response::error_for_status) + .map_err(|e| Error::Remote(e.to_string()))? + .json() + .map_err(|e| Error::Remote(e.to_string()))?; + if let Some(err) = response.error { + // Preserve "not found" so callers keep the typed contract. + return Err(if err.code == NOT_FOUND { + Error::NodeNotFound(err.message) + } else { + Error::Remote(err.message) + }); + } + Ok(response.result.unwrap_or(Value::Null)) + } + + /// Call `method` and decode the result into `T`. + fn call_as(&self, method: &str, params: Value) -> Result { + let value = self.call(method, params)?; + serde_json::from_value(value).map_err(|e| Error::Remote(e.to_string())) + } +} + +impl Store for RemoteStore { + fn create_node(&mut self, input: NewNode) -> Result { + self.call_as("node.create", json!(input)) + } + + fn get_node(&self, id: &str) -> Result> { + self.call_as("node.get", json!({ "id": id })) + } + + fn update_node( + &mut self, + id: &str, + title: Option, + body: Option, + ) -> Result { + self.call_as( + "node.update", + json!({ "id": id, "title": title, "body": body }), + ) + } + + fn tombstone_node(&mut self, id: &str) -> Result<()> { + self.call("node.tombstone", json!({ "id": id })).map(|_| ()) + } + + fn create_task(&mut self, input: NewTask) -> Result { + self.call_as("task.create", json!(input)) + } + + fn get_task(&self, node_id: &str) -> Result> { + self.call_as("task.get", json!({ "id": node_id })) + } + + fn set_task_state(&mut self, node_id: &str, state: TaskState) -> Result { + self.call_as("task.set_state", json!({ "id": node_id, "state": state })) + } + + fn skip_recurrence(&mut self, node_id: &str) -> Result { + self.call_as("task.skip", json!({ "id": node_id })) + } + + fn set_task_attention(&mut self, node_id: &str, attention: Attention) -> Result { + self.call_as( + "task.set_attention", + json!({ "id": node_id, "attention": attention }), + ) + } + + fn next(&self, scope: Option<&str>, limit: usize) -> Result> { + self.call_as("next", json!({ "scope": scope, "limit": limit })) + } + + fn list( + &self, + scope: Option<&str>, + attention: Option, + include_blue: bool, + ) -> Result> { + self.call_as( + "list", + json!({ "scope": scope, "attention": attention, "include_blue": include_blue }), + ) + } + + fn health(&self) -> Result { + self.call_as("health", json!({})) + } + + fn search(&self, query: &str) -> Result> { + self.call_as("search", json!({ "query": query })) + } + + fn journal_open_or_create(&mut self, date: &str) -> Result { + self.call_as("journal.open_or_create", json!({ "date": date })) + } + + fn add_link(&mut self, src_id: &str, dst_id: &str, link_type: LinkType) -> Result { + self.call_as( + "links.add", + json!({ "src": src_id, "dst": dst_id, "link_type": link_type }), + ) + } + + fn outgoing_links(&self, id: &str) -> Result> { + self.call_as("links.outgoing", json!({ "id": id })) + } + + fn backlinks(&self, id: &str) -> Result> { + self.call_as("links.backlinks", json!({ "id": id })) + } + + fn log_append(&mut self, task_id: &str, text: &str) -> Result<()> { + self.call("log.append", json!({ "task_id": task_id, "text": text })) + .map(|_| ()) + } + + fn log_tail(&self, task_id: &str, n: usize) -> Result> { + self.call_as("log.tail", json!({ "task_id": task_id, "n": n })) + } + + fn export(&self, dir: &std::path::Path) -> Result { + // Export runs server-side, writing under `dir` on the server's host. + let out: Value = self.call("export", json!({ "path": dir.to_string_lossy() }))?; + out.get("count") + .and_then(Value::as_u64) + .map(|n| n as usize) + .ok_or_else(|| Error::Remote("export: missing count".into())) + } + + // --- sync primitives: not meaningful for a no-replica client --- + + fn ops_since(&self, _after: Option<&str>) -> Result> { + Err(Error::Remote( + "ops_since is unsupported in client mode".into(), + )) + } + + fn apply_op(&mut self, _op: &heph_core::Op) -> Result { + Err(Error::Remote( + "apply_op is unsupported in client mode".into(), + )) + } + + fn adopt_owner(&mut self, _canonical: &str) -> Result<()> { + // The server owns the data; a client has nothing local to re-own. + Ok(()) + } + + fn sync_state(&self, _peer: &str) -> Result { + Ok(SyncCursors::default()) + } + + fn record_sync( + &mut self, + _peer: &str, + _pushed: Option<&str>, + _pulled: Option<&str>, + ) -> Result<()> { + Ok(()) + } + + fn conflicts_list(&self) -> Result> { + self.call_as("conflicts.list", json!({})) + } + + fn conflicts_resolve(&mut self, id: &str, choice: &str) -> Result<()> { + self.call("conflicts.resolve", json!({ "id": id, "choice": choice })) + .map(|_| ()) + } +} diff --git a/crates/hephd/src/rpc.rs b/crates/hephd/src/rpc.rs index d6762b7..e1ca489 100644 --- a/crates/hephd/src/rpc.rs +++ b/crates/hephd/src/rpc.rs @@ -13,7 +13,7 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use heph_core::{Attention, NewNode, NewTask, Store, TaskState}; +use heph_core::{Attention, LinkType, NewNode, NewTask, Store, TaskState}; /// A JSON-RPC request line. #[derive(Debug, Deserialize)] @@ -168,6 +168,13 @@ struct LinkParams { id: String, } +#[derive(Deserialize)] +struct AddLinkParams { + src: String, + dst: String, + link_type: LinkType, +} + #[derive(Deserialize)] struct LogAppendParams { task_id: String, @@ -223,6 +230,10 @@ pub fn dispatch(store: &mut dyn Store, method: &str, params: Value) -> Result { + let p: IdParam = parse(params)?; + json!(store.get_task(&p.id)?) + } "task.set_state" => { let p: SetStateParams = parse(params)?; json!(store.set_task_state(&p.id, p.state)?) @@ -252,6 +263,10 @@ pub fn dispatch(store: &mut dyn Store, method: &str, params: Value) -> Result { + let p: AddLinkParams = parse(params)?; + json!(store.add_link(&p.src, &p.dst, p.link_type)?) + } "links.outgoing" => { let p: LinkParams = parse(params)?; json!(store.outgoing_links(&p.id)?) diff --git a/crates/hephd/src/server.rs b/crates/hephd/src/server.rs index 8e158f3..055b9a0 100644 --- a/crates/hephd/src/server.rs +++ b/crates/hephd/src/server.rs @@ -16,28 +16,28 @@ use serde_json::{json, Value}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{UnixListener, UnixStream}; -use heph_core::{LocalStore, Store}; +use heph_core::Store; use crate::rpc::{self, Request, Response, RpcError, INTERNAL_ERROR, PARSE_ERROR}; -use crate::sync; +use crate::sync::{self, SharedStore}; /// The shared, cheaply-cloneable context each connection serves from. #[derive(Clone)] struct Ctx { - store: Arc>, + store: SharedStore, /// The hub this device syncs with, if it is a spoke (`local` + `hub_url`). hub_url: Option, http: reqwest::Client, } -/// A running daemon over a shared local store. +/// A running daemon over a shared store (any [`Store`] backend). pub struct Daemon { ctx: Ctx, } impl Daemon { - /// Wrap an opened store. - pub fn new(store: LocalStore) -> Daemon { + /// Wrap an opened store (a `LocalStore`, or a `client`-mode `RemoteStore`). + pub fn new(store: S) -> Daemon { Daemon { ctx: Ctx { store: Arc::new(Mutex::new(store)), @@ -55,7 +55,7 @@ impl Daemon { /// The shared store handle, for code that needs to reach the same store the /// daemon serves (the hub HTTP router and background sync, tech-spec §6.1). - pub fn store(&self) -> Arc> { + pub fn store(&self) -> SharedStore { self.ctx.store.clone() } diff --git a/crates/hephd/src/sync.rs b/crates/hephd/src/sync.rs index a559ac3..bbbbb27 100644 --- a/crates/hephd/src/sync.rs +++ b/crates/hephd/src/sync.rs @@ -7,6 +7,8 @@ //! //! - `POST /sync/push` — the spoke sends its new ops; the hub merges them. //! - `GET /sync/pull?after=` — the hub returns ops past the spoke's cursor. +//! - `POST /rpc` — the full daemon API ([`crate::rpc::dispatch`]) over HTTP, for +//! a no-replica `client`-mode [`crate::remote::RemoteStore`] to proxy against. //! //! Exchange is **incremental by HLC cursor** (`sync_state`, [`heph_core::SyncCursors`]): //! each side transfers only the tail it hasn't sent/seen. Merge is idempotent, @@ -22,11 +24,15 @@ use axum::http::StatusCode; use axum::routing::{get, post}; use axum::{Json, Router}; use serde::{Deserialize, Serialize}; +use serde_json::Value; -use heph_core::{LocalStore, Op, Store}; +use heph_core::{Op, Store}; -/// The shared store handle a hub serves from. -type SharedStore = Arc>; +use crate::rpc::{self, Response, RpcError, INTERNAL_ERROR}; + +/// The shared store a hub serves from — any [`Store`], so a `server` fronts a +/// `LocalStore` and (later modes) could front another backend. +pub type SharedStore = Arc>; /// A batch of ops in flight (push body / pull response). #[derive(Debug, Serialize, Deserialize)] @@ -50,13 +56,13 @@ pub struct SyncReport { /// an async worker, tech-spec §3). async fn with_store(store: &SharedStore, f: F) -> Result where - F: FnOnce(&mut LocalStore) -> heph_core::Result + Send + 'static, + F: FnOnce(&mut (dyn Store + Send)) -> heph_core::Result + Send + 'static, T: Send + 'static, { let store = store.clone(); let out = tokio::task::spawn_blocking(move || { let mut guard = store.lock().expect("store mutex poisoned"); - f(&mut guard) + f(&mut *guard) }) .await?; Ok(out?) @@ -65,7 +71,7 @@ where /// Apply a batch of ops in HLC order, returning how many were newly applied and /// the highest HLC seen (the new cursor position). fn apply_batch( - store: &mut LocalStore, + store: &mut (dyn Store + Send), mut ops: Vec, ) -> heph_core::Result<(usize, Option)> { ops.sort_by(|a, b| a.hlc.cmp(&b.hlc)); @@ -85,9 +91,42 @@ pub fn router(store: SharedStore) -> Router { Router::new() .route("/sync/pull", get(pull)) .route("/sync/push", post(push)) + .route("/rpc", post(rpc_call)) .with_state(store) } +/// One `POST /rpc` call: a method name + params, mirroring the unix-socket RPC. +#[derive(Debug, Deserialize)] +struct RpcCall { + method: String, + #[serde(default)] + params: Value, +} + +/// `POST /rpc` — run one [`rpc::dispatch`] call on the hub's store and return a +/// JSON-RPC-shaped [`Response`] (result xor error). Always HTTP 200; method +/// failures travel in the body so the client can reconstruct the error. +async fn rpc_call(State(store): State, Json(call): Json) -> Json { + let store = store.clone(); + let dispatched = tokio::task::spawn_blocking(move || { + let mut guard = store.lock().expect("store mutex poisoned"); + rpc::dispatch(&mut *guard, &call.method, call.params) + }) + .await; + let response = match dispatched { + Ok(Ok(value)) => Response::ok(Value::Null, value), + Ok(Err(rpc_err)) => Response::failed(Value::Null, rpc_err), + Err(join_err) => Response::failed( + Value::Null, + RpcError { + code: INTERNAL_ERROR, + message: format!("dispatch task failed: {join_err}"), + }, + ), + }; + Json(response) +} + #[derive(Debug, Deserialize)] struct PullQuery { /// HLC cursor — return ops strictly newer than this (absent ⇒ from the start). diff --git a/crates/hephd/tests/client_mode.rs b/crates/hephd/tests/client_mode.rs new file mode 100644 index 0000000..f79b91d --- /dev/null +++ b/crates/hephd/tests/client_mode.rs @@ -0,0 +1,97 @@ +//! Client mode over real HTTP (tech-spec §3.1, slice 9b). A server runs the hub +//! router (which includes `/rpc`) over a temp `LocalStore`; a `RemoteStore` +//! proxies the `Store` API to it and we assert the calls land on the server's +//! store. The server runs on its own runtime thread, so the test thread can use +//! the blocking client without nesting runtimes. + +use std::sync::mpsc; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; + +use heph_core::{Attention, Error, FixedClock, LocalStore, NewNode, NewTask, Store}; +use hephd::sync::{self, SharedStore}; +use hephd::RemoteStore; + +const NOW: i64 = 1_704_067_200_000; // 2024-01-01T00:00:00Z + +/// Start the hub router over a temp `LocalStore` on an ephemeral port; return +/// its base URL. The server thread + temp dir live for the test's duration. +fn start_server() -> String { + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + let dir = tempfile::tempdir().unwrap(); + let store = + LocalStore::open(dir.path().join("heph.db"), Box::new(FixedClock(NOW))).unwrap(); + let shared: SharedStore = Arc::new(Mutex::new(store)); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + tx.send(listener.local_addr().unwrap()).unwrap(); + let _keep = dir; // keep the temp DB alive while we serve + axum::serve(listener, sync::router(shared)).await.unwrap(); + }); + }); + let addr = rx.recv_timeout(Duration::from_secs(5)).unwrap(); + format!("http://{addr}") +} + +#[test] +fn remote_store_proxies_the_store_api() { + let base = start_server(); + let mut remote = RemoteStore::new(&base); + + // Create + read a node round-trips through HTTP. + let node = remote + .create_node(NewNode::doc("Roof", "shingles need work")) + .unwrap(); + let got = remote.get_node(&node.id).unwrap().expect("node on server"); + assert_eq!(got.title, "Roof"); + assert_eq!(got.body.as_deref(), Some("shingles need work")); + + // A missing node is Ok(None), not an error. + assert!(remote.get_node("does-not-exist").unwrap().is_none()); + + // A body update materializes server-side and is full-text searchable. + remote + .update_node(&node.id, None, Some("new cedar shingles".into())) + .unwrap(); + let hits = remote.search("cedar").unwrap(); + assert!(hits.iter().any(|h| h.id == node.id), "search missed update"); + + // Tasks proxy too, and land in the Organizational list. + let task = remote + .create_task(NewTask { + title: "Renew passport".into(), + attention: Some(Attention::Red), + ..Default::default() + }) + .unwrap(); + let fetched = remote + .get_task(&task.node_id) + .unwrap() + .expect("task on server"); + assert_eq!(fetched.node_id, task.node_id); + let listed = remote.list(None, None, true).unwrap(); + assert!( + listed.iter().any(|t| t.node_id == task.node_id), + "task missing from list" + ); + + // Read-only aggregates proxy and start empty. + assert!(remote.health().unwrap().active_count >= 1); + assert!(remote.conflicts_list().unwrap().is_empty()); +} + +#[test] +fn remote_store_preserves_not_found() { + let base = start_server(); + let mut remote = RemoteStore::new(&base); + let err = remote + .update_node("missing", Some("x".into()), None) + .unwrap_err(); + assert!(matches!(err, Error::NodeNotFound(_)), "got {err:?}"); +} diff --git a/docs/changelog.d/v1-prototype.feature.md b/docs/changelog.d/v1-prototype.feature.md index 92cbacb..6e7f14f 100644 --- a/docs/changelog.d/v1-prototype.feature.md +++ b/docs/changelog.d/v1-prototype.feature.md @@ -10,4 +10,5 @@ Begin the v1 prototype (Phase 1, tech-spec §11.1), built in TDD slices: - Sync engine, local-only (§12): real hybrid logical clock + persistent device `origin`; an append-only op-log per mutation; an idempotent, order-independent merge/apply engine — last-writer-wins task scalars (discards surfaced in a `conflicts` queue), OR-set links, monotonic tombstones. Two-replica convergence proven. - Body text CRDT (§5, §12, slice 8d): node bodies now merge through the `yrs` text CRDT (`body_crdt`) instead of last-writer-wins — whole-buffer writes are diffed into the doc and the yrs delta rides the op, so concurrent edits to different regions both survive and never enqueue a conflict. - Network sync over HTTP (§6.1, §12, slice 9a): `hephd --mode server` exposes a sync hub (`POST /sync/push`, `GET /sync/pull?after=`, axum) over the same store; `hephd --mode local --hub-url ` becomes a spoke that background-syncs its op-log with that hub (and on demand via the `sync.now`/`sync.status` RPC). Exchange is incremental by HLC cursor (`sync_state`) and idempotent. The merge engine is `heph-core`'s, unchanged. Unauthenticated/single-owner for now (auth lands with OIDC). `conflicts.list`/`conflicts.resolve` are now reachable over the daemon socket. +- Client mode (§3.1, slice 9b): `hephd --mode client --server-url ` runs with no local replica, proxying every store call to a server's `POST /rpc` endpoint (the full daemon API over HTTP). The daemon is now backend-agnostic (`local`/`server` front a `LocalStore`, `client` a `RemoteStore`), so surfaces see the same unix-socket API in every mode. - CI runs the Rust suite (fmt/clippy/test) via the project build hook. diff --git a/docs/reference/tech-spec.md b/docs/reference/tech-spec.md index 7c6eaef..1f0bdf6 100644 --- a/docs/reference/tech-spec.md +++ b/docs/reference/tech-spec.md @@ -326,7 +326,7 @@ See [[design]] §5–§7 for the constraints later phases impose on present choi ## 14. Implementation status (Phase 1 tracker) -> Cross-session resume tracker for the Phase 1 C1 (branch `feature/v1-prototype`, PR #1). Updated 2026-06-01 — **100 tests green** (`cargo test --all`), `clippy -D warnings` + `fmt` + `prek` clean. Workspace: `crates/heph-core`, `crates/hephd`, `crates/heph` (no `heph.nvim/` yet). +> Cross-session resume tracker for the Phase 1 C1 (branch `feature/v1-prototype`, PR #1). Updated 2026-06-01 — **102 tests green** (`cargo test --all`), `clippy -D warnings` + `fmt` + `prek` clean. Workspace: `crates/heph-core`, `crates/hephd`, `crates/heph` (no `heph.nvim/` yet). **Done** @@ -335,18 +335,18 @@ See [[design]] §5–§7 for the constraints later phases impose on present choi - ✅ **Recurrence (§4.4):** roll-forward in place — fresh checklist, logged occurrence, advance-skipping-misses; completion never carries forward (proptest). Per-task logs; `skip`. - ✅ **Ranking (§7):** pure two-stage filter + reorderable named dimensions; proptest total order. - ✅ **Daemon RPC (§6):** node.get/create/update/tombstone, task.create/set_state/set_attention/skip, next, list, health, journal.open_or_create, search, links.outgoing/backlinks, log.append/tail, export, conflicts.list/resolve, sync.now/sync.status. Line-delimited JSON-RPC over a unix socket; sync `Client`. (`ops_since`/`apply_op` are `Store` methods exchanged over the hub HTTP endpoint, not the unix socket.) -- ✅ **Runtime modes (§3.1) — `local` + `server`:** exclusive file-lock handoff via `LockGuard`; `--mode local|server`, `--hub-url`, `--http-addr`. `client` (no replica) is a later slice. +- ✅ **Runtime modes (§3.1) — `local` + `server` + `client`:** exclusive file-lock handoff via `LockGuard` (local/server only); `--mode local|server|client`, `--hub-url`, `--http-addr`, `--server-url`. - ✅ **Sync engine (§12) minus network:** HLC (clock-injected, monotonic) + persistent device `origin`; op-log per mutation; `apply_op` merge — **LWW** task scalars + titles with a **conflict queue**, **OR-set** links, monotonic tombstones, idempotent; two-replica convergence proven. `adopt_owner` = basic §13 canonical-owner adoption. - ✅ **Body text CRDT (§5, §12, slice 8d):** node bodies merge through the **`yrs`** text CRDT (`body_crdt` BLOB) instead of LWW. A device authors under a stable `client_id` derived from its `origin`; whole-buffer writes are diffed (common prefix/suffix, char-boundary safe) into the doc; the yrs delta rides the `node.create`/`node.set` op (`body_crdt` field) and `apply` merges it — concurrent disjoint edits both survive and never enqueue a conflict. - ✅ **Network sync (§6.1, §12, slice 9a):** **transport ratified = `axum` HTTP/JSON.** The hub (`server` mode) exposes `POST /sync/push` + `GET /sync/pull?after=` over the same store; a spoke (`local` + `hub_url`) runs `sync::sync_once` (pull→merge, then push) and background-syncs on a 30s interval. Incremental by HLC cursor (`sync_state`/`SyncCursors`); idempotent re-push is a no-op. Two spokes converge through a real-HTTP hub (incl. scalar conflict) in `tests/sync_http.rs`. **Unauthenticated for now, single-owner** (auth + per-user scoping is slice 10). +- ✅ **`client` mode + `RemoteStore` (§3.1, slice 9b):** a no-replica backend that proxies every `Store` call to a `server`'s `POST /rpc` (the full `dispatch`, over HTTP) via a **blocking** reqwest client — the online-only escape hatch. `Daemon` is now generic over `dyn Store + Send`, so the same unix-socket surface fronts either a `LocalStore` or a `RemoteStore`. Sync primitives are stubbed (a client has no op-log). Proven in `tests/client_mode.rs`. `dispatch` gained `task.get` + `links.add`. - ✅ **CLI (§1):** `heph` next/task/doc/get/export/search/journal. - ✅ **CI (§9):** `.forgejo/scripts/build` runs fmt/clippy/test (self-bootstrapping rustup). **Not yet done (resume order)** -1. ⏳ **`client` mode + `RemoteStore` (§3.1, slice 9b):** a no-replica spoke that proxies every `Store` method to a `server` over HTTP — the online-only escape hatch (borrowed box, CI, future web backend). Open: how the proxy maps the mutating `Store` surface; `sync.now` is N/A for `client`. -2. ⏳ **OIDC/Authentik auth (§13):** device-code flow, bearer token on the hub endpoint, full per-user isolation, adoption-with-deterministic-ids. -3. ⏳ **`heph.nvim` (§8):** obsidian.nvim parity + task views; headless-nvim e2e (needs `neovim` + `plenary.nvim` on the CI runner). +1. ⏳ **OIDC/Authentik auth (§13):** device-code flow, bearer token on the hub `POST /sync/*` + `/rpc` endpoints, full per-user isolation, adoption-with-deterministic-ids. +2. ⏳ **`heph.nvim` (§8):** obsidian.nvim parity + task views; headless-nvim e2e (needs `neovim` + `plenary.nvim` on the CI runner). ## Related