hephd: client mode + RemoteStore (sync 9b)
Some checks failed
Build / validate (pull_request) Failing after 4s

Add the online-only escape hatch — a no-replica daemon that proxies every
Store call to a server over HTTP (tech-spec §3.1).

- Daemon is now generic over the backing store (Arc<Mutex<dyn Store +
  Send>>), so the same unix-socket surface fronts either a LocalStore
  (local/server) or a RemoteStore (client). sync::router/sync_once and the
  Ctx follow suit.
- New POST /rpc route on the hub router runs the full rpc::dispatch over
  HTTP (result-xor-error body, always 200). dispatch gains task.get and
  links.add so the proxied API is complete.
- RemoteStore (hephd): implements heph_core::Store by forwarding each call
  to /rpc via a blocking reqwest client (Store is sync; the daemon only
  calls it from the blocking pool). Error::Remote for transport failures;
  NOT_FOUND is preserved as Error::NodeNotFound. Sync primitives are
  stubbed (a client keeps no op-log).
- main: --mode client + --server-url; client skips the file lock and opens
  no LocalStore.
- tests/client_mode.rs: a RemoteStore drives node/task/search/list/health
  against a real HTTP server, and not-found maps back correctly.

102 tests green; clippy -D warnings + fmt + prek clean. Next: OIDC auth.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Erich Blume 2026-06-01 15:29:28 -07:00
commit 5d54e913c2
13 changed files with 486 additions and 68 deletions

18
Cargo.lock generated
View file

@ -455,6 +455,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@ -463,6 +464,18 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
[[package]]
name = "futures-io"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
[[package]]
name = "futures-sink"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893"
[[package]]
name = "futures-task"
version = "0.3.32"
@ -476,7 +489,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [
"futures-core",
"futures-io",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"slab",
]
@ -1199,7 +1215,9 @@ checksum = "219c5811de6525e5416c7d5d53bb656d3afdbc6c5af816e0802bcfa42dbdc1c3"
dependencies = [
"base64",
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"http",
"http-body",
"http-body-util",

View file

@ -37,6 +37,7 @@ axum = "0.8"
reqwest = { version = "0.13", default-features = false, features = [
"json",
"query",
"blocking",
] }
[profile.release]

View file

@ -10,7 +10,7 @@ See **[docs/explanation/design.md](docs/explanation/design.md)** for the vision
## Status
**Phase 1 (v1 prototype) — in progress** on branch `feature/v1-prototype`. The **local system is feature-complete and replicas now sync through a hub over HTTP** — the offline-first everyday config (`local` + `hub_url`) converges end-to-end, with a `yrs` text-CRDT merging bodies. Remaining: the online-only `client` mode, auth, and the Neovim plugin. Built test-first (100 tests at last update). The canonical tracker is **tech-spec §14**.
**Phase 1 (v1 prototype) — in progress** on branch `feature/v1-prototype`. **All three runtime modes are implemented and replicas sync through a hub over HTTP** — the offline-first everyday config (`local` + `hub_url`) converges end-to-end with a `yrs` text-CRDT merging bodies, and `client` mode proxies to a server with no local replica. Remaining: auth and the Neovim plugin. Built test-first (102 tests at last update). The canonical tracker is **tech-spec §14**.
| Area | State |
|---|---|
@ -21,8 +21,8 @@ See **[docs/explanation/design.md](docs/explanation/design.md)** for the vision
| Sync engine — HLC, op-log, converging merge + conflict queue (no network yet) | ✅ done |
| yrs text-CRDT for body merge | ✅ done |
| `server` (hub) mode + spoke push/pull sync over HTTP (axum) | ✅ done |
| `client` mode + `RemoteStore` (online-only, no replica) | ⏳ next |
| OIDC/Authentik auth + per-user isolation | ⏳ |
| `client` mode + `RemoteStore` (online-only, no replica) | ✅ done |
| OIDC/Authentik auth + per-user isolation | ⏳ next |
| `heph.nvim` (primary surface) | ⏳ |
## Architecture
@ -30,7 +30,7 @@ See **[docs/explanation/design.md](docs/explanation/design.md)** for the vision
A Cargo workspace, layered so the same core runs from a laptop to a hub:
- **`crates/heph-core`** — the library: data model, the `Store` trait + SQLite store, markdown parsing/extraction, recurrence, the "what is next?" engine, and the sync engine (op-log, hybrid logical clocks, CRDT/LWW merge, conflict detection). Synchronous and clock-injected (no ambient wall-clock reads) so ranking and merge are deterministic.
- **`crates/hephd`** — the per-device daemon. One binary, three modes — **`local`** (own SQLite replica; a syncing spoke when given `--hub-url`), **`server`** (also the sync hub: an HTTP endpoint others sync against), **`client`** *(planned)* (thin, remote, no replica) — selected by configuration via a targetable `Store` backend. Surfaces connect to it over a unix socket; it owns the DB handle and background sync.
- **`crates/hephd`** — the per-device daemon. One binary, three modes — **`local`** (own SQLite replica; a syncing spoke when given `--hub-url`), **`server`** (also the sync hub: an HTTP endpoint others sync against), **`client`** (thin, remote, no replica — proxies to a `--server-url`) — selected by configuration via a targetable `Store` backend. Surfaces connect to it over a unix socket; it owns the DB handle and background sync.
- **`crates/heph`** — the CLI: a thin client of the daemon (no direct DB access).
- **`heph.nvim/`** *(planned)* — the Neovim plugin, the primary editing/agenda surface.
@ -83,7 +83,7 @@ mise run ai-docs # docs AI agents read firs
```
./Cargo.toml # workspace manifest
./crates/heph-core/ # core library: model, store, extraction, recurrence, ranking, sync
./crates/hephd/ # daemon: local + server (hub) modes — unix-socket RPC + HTTP sync; client planned
./crates/hephd/ # daemon: local/server/client modes — unix-socket RPC + HTTP sync/rpc
./crates/heph/ # CLI: thin client of the daemon
./heph.nvim/ # Neovim plugin (planned)
./docs/ # Diataxis docs (design, tech-spec, how-to), Quartz config

View file

@ -22,6 +22,11 @@ pub enum Error {
/// A value in the database did not match the expected shape.
#[error("data integrity: {0}")]
Integrity(String),
/// A remote backend (a `RemoteStore` in `client` mode) failed or returned an
/// error response (tech-spec §3.1).
#[error("remote: {0}")]
Remote(String),
}
/// Convenience result alias.

View file

@ -10,6 +10,7 @@
pub mod client;
pub mod clock;
pub mod lock;
pub mod remote;
pub mod rpc;
pub mod server;
pub mod sync;
@ -19,6 +20,7 @@ use std::path::PathBuf;
pub use client::Client;
pub use clock::SystemClock;
pub use lock::LockGuard;
pub use remote::RemoteStore;
pub use server::Daemon;
pub use sync::{sync_once, SyncReport};

View file

@ -14,7 +14,9 @@ use clap::{Parser, ValueEnum};
use tokio::net::{TcpListener, UnixListener};
use heph_core::LocalStore;
use hephd::{default_db_path, default_socket_path, sync, Daemon, LockGuard, SystemClock};
use hephd::{
default_db_path, default_socket_path, sync, Daemon, LockGuard, RemoteStore, SystemClock,
};
/// How often a spoke background-syncs with its hub.
const SYNC_INTERVAL: Duration = Duration::from_secs(30);
@ -27,6 +29,8 @@ enum Mode {
Local,
/// Also a sync hub: exposes the authenticated network endpoint over HTTP.
Server,
/// No local replica; proxy every call to a `--server-url` (online-only).
Client,
}
/// The Hephaestus per-device daemon.
@ -52,6 +56,10 @@ struct Cli {
/// Address for the hub HTTP endpoint (server mode only).
#[arg(long)]
http_addr: Option<String>,
/// Server to proxy to (client mode only; required there).
#[arg(long)]
server_url: Option<String>,
}
#[tokio::main]
@ -64,56 +72,71 @@ async fn main() -> Result<()> {
.init();
let cli = Cli::parse();
let db = cli.db.unwrap_or_else(default_db_path);
let socket = cli.socket.unwrap_or_else(default_socket_path);
if let Some(parent) = db.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("creating store dir {}", parent.display()))?;
}
let socket = cli.socket.clone().unwrap_or_else(default_socket_path);
if let Some(parent) = socket.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("creating socket dir {}", parent.display()))?;
}
// Take the exclusive lock before opening the store (tech-spec §3.1).
let _lock = LockGuard::acquire(&db)?;
let store = LocalStore::open(&db, Box::new(SystemClock))?;
let daemon = Daemon::new(store).with_hub(cli.hub_url.clone());
// server mode: expose the hub HTTP endpoint over the same store.
if cli.mode == Mode::Server {
let addr = cli
.http_addr
.clone()
.unwrap_or_else(|| DEFAULT_HTTP_ADDR.to_string());
let app = sync::router(daemon.store());
let listener = TcpListener::bind(&addr)
.await
.with_context(|| format!("binding hub HTTP endpoint {addr}"))?;
tracing::info!(%addr, "hub HTTP endpoint listening");
tokio::spawn(async move {
if let Err(e) = axum::serve(listener, app).await {
tracing::error!("hub HTTP endpoint stopped: {e}");
// Build the daemon for the chosen mode. `local`/`server` own the file (and
// hold its lock for the process's life); `client` keeps no replica.
let (_lock, daemon) = match cli.mode {
Mode::Client => {
let server_url = cli
.server_url
.clone()
.context("client mode requires --server-url")?;
tracing::info!(%server_url, "client mode: proxying to server (no local replica)");
(None, Daemon::new(RemoteStore::new(&server_url)))
}
Mode::Local | Mode::Server => {
let db = cli.db.clone().unwrap_or_else(default_db_path);
if let Some(parent) = db.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("creating store dir {}", parent.display()))?;
}
});
}
// Take the exclusive lock before opening the store (tech-spec §3.1).
let lock = LockGuard::acquire(&db)?;
let store = LocalStore::open(&db, Box::new(SystemClock))?;
let daemon = Daemon::new(store).with_hub(cli.hub_url.clone());
// spoke: background-sync the op-log with the configured hub.
if let Some(hub) = cli.hub_url.clone() {
let store = daemon.store();
tokio::spawn(async move {
let http = reqwest::Client::new();
let mut tick = tokio::time::interval(SYNC_INTERVAL);
loop {
tick.tick().await;
match hephd::sync_once(store.clone(), &hub, &http).await {
Ok(report) => tracing::debug!(?report, "background sync"),
Err(e) => tracing::warn!("background sync failed: {e}"),
}
// server mode: expose the hub HTTP endpoint over the same store.
if cli.mode == Mode::Server {
let addr = cli
.http_addr
.clone()
.unwrap_or_else(|| DEFAULT_HTTP_ADDR.to_string());
let app = sync::router(daemon.store());
let http_listener = TcpListener::bind(&addr)
.await
.with_context(|| format!("binding hub HTTP endpoint {addr}"))?;
tracing::info!(%addr, "hub HTTP endpoint listening");
tokio::spawn(async move {
if let Err(e) = axum::serve(http_listener, app).await {
tracing::error!("hub HTTP endpoint stopped: {e}");
}
});
}
});
}
// spoke: background-sync the op-log with the configured hub.
if let Some(hub) = cli.hub_url.clone() {
let store = daemon.store();
tokio::spawn(async move {
let http = reqwest::Client::new();
let mut tick = tokio::time::interval(SYNC_INTERVAL);
loop {
tick.tick().await;
match hephd::sync_once(store.clone(), &hub, &http).await {
Ok(report) => tracing::debug!(?report, "background sync"),
Err(e) => tracing::warn!("background sync failed: {e}"),
}
}
});
}
(Some(lock), daemon)
}
};
// Replace any stale socket from a previous run, then bind.
if socket.exists() {
@ -123,6 +146,6 @@ async fn main() -> Result<()> {
let listener = UnixListener::bind(&socket)
.with_context(|| format!("binding socket {}", socket.display()))?;
tracing::info!(db = %db.display(), socket = %socket.display(), mode = ?cli.mode, "hephd listening");
tracing::info!(socket = %socket.display(), mode = ?cli.mode, "hephd listening");
daemon.serve(listener).await
}

217
crates/hephd/src/remote.rs Normal file
View file

@ -0,0 +1,217 @@
//! `RemoteStore` — the `client`-mode backend (tech-spec §3.1).
//!
//! A `client` keeps **no local replica**: it proxies every [`Store`] call to a
//! `server`'s `POST /rpc` endpoint (the same [`crate::rpc::dispatch`] the unix
//! socket runs). Since `Store` is synchronous, this uses a **blocking** reqwest
//! client; the daemon only ever calls store methods from its blocking pool, so
//! that is safe.
//!
//! With no op-log of its own, the sync primitives (`ops_since`/`apply_op`/
//! `sync_state`/`record_sync`) are not meaningful here — a `client` never
//! background-syncs, it reads and writes the hub live. They are stubbed
//! accordingly; the daemon never invokes them in this mode.
use serde::de::DeserializeOwned;
use serde_json::{json, Value};
use heph_core::{
Attention, Conflict, Error, Health, Link, LinkType, NewNode, NewTask, Node, Result, Store,
SyncCursors, Task, TaskState,
};
use crate::rpc::{Response, NOT_FOUND};
/// A no-replica store that proxies to a `server` over HTTP.
pub struct RemoteStore {
base: String,
http: reqwest::blocking::Client,
}
impl RemoteStore {
/// Point a client at `server_url` (e.g. `http://hub.example:8787`).
pub fn new(server_url: &str) -> RemoteStore {
RemoteStore {
base: server_url.trim_end_matches('/').to_string(),
http: reqwest::blocking::Client::new(),
}
}
/// Issue one `/rpc` call, returning the raw `result` value.
fn call(&self, method: &str, params: Value) -> Result<Value> {
let response: Response = self
.http
.post(format!("{}/rpc", self.base))
.json(&json!({ "method": method, "params": params }))
.send()
.and_then(reqwest::blocking::Response::error_for_status)
.map_err(|e| Error::Remote(e.to_string()))?
.json()
.map_err(|e| Error::Remote(e.to_string()))?;
if let Some(err) = response.error {
// Preserve "not found" so callers keep the typed contract.
return Err(if err.code == NOT_FOUND {
Error::NodeNotFound(err.message)
} else {
Error::Remote(err.message)
});
}
Ok(response.result.unwrap_or(Value::Null))
}
/// Call `method` and decode the result into `T`.
fn call_as<T: DeserializeOwned>(&self, method: &str, params: Value) -> Result<T> {
let value = self.call(method, params)?;
serde_json::from_value(value).map_err(|e| Error::Remote(e.to_string()))
}
}
impl Store for RemoteStore {
fn create_node(&mut self, input: NewNode) -> Result<Node> {
self.call_as("node.create", json!(input))
}
fn get_node(&self, id: &str) -> Result<Option<Node>> {
self.call_as("node.get", json!({ "id": id }))
}
fn update_node(
&mut self,
id: &str,
title: Option<String>,
body: Option<String>,
) -> Result<Node> {
self.call_as(
"node.update",
json!({ "id": id, "title": title, "body": body }),
)
}
fn tombstone_node(&mut self, id: &str) -> Result<()> {
self.call("node.tombstone", json!({ "id": id })).map(|_| ())
}
fn create_task(&mut self, input: NewTask) -> Result<Task> {
self.call_as("task.create", json!(input))
}
fn get_task(&self, node_id: &str) -> Result<Option<Task>> {
self.call_as("task.get", json!({ "id": node_id }))
}
fn set_task_state(&mut self, node_id: &str, state: TaskState) -> Result<Task> {
self.call_as("task.set_state", json!({ "id": node_id, "state": state }))
}
fn skip_recurrence(&mut self, node_id: &str) -> Result<Task> {
self.call_as("task.skip", json!({ "id": node_id }))
}
fn set_task_attention(&mut self, node_id: &str, attention: Attention) -> Result<Task> {
self.call_as(
"task.set_attention",
json!({ "id": node_id, "attention": attention }),
)
}
fn next(&self, scope: Option<&str>, limit: usize) -> Result<Vec<heph_core::RankedTask>> {
self.call_as("next", json!({ "scope": scope, "limit": limit }))
}
fn list(
&self,
scope: Option<&str>,
attention: Option<Attention>,
include_blue: bool,
) -> Result<Vec<Task>> {
self.call_as(
"list",
json!({ "scope": scope, "attention": attention, "include_blue": include_blue }),
)
}
fn health(&self) -> Result<Health> {
self.call_as("health", json!({}))
}
fn search(&self, query: &str) -> Result<Vec<Node>> {
self.call_as("search", json!({ "query": query }))
}
fn journal_open_or_create(&mut self, date: &str) -> Result<Node> {
self.call_as("journal.open_or_create", json!({ "date": date }))
}
fn add_link(&mut self, src_id: &str, dst_id: &str, link_type: LinkType) -> Result<Link> {
self.call_as(
"links.add",
json!({ "src": src_id, "dst": dst_id, "link_type": link_type }),
)
}
fn outgoing_links(&self, id: &str) -> Result<Vec<Link>> {
self.call_as("links.outgoing", json!({ "id": id }))
}
fn backlinks(&self, id: &str) -> Result<Vec<Link>> {
self.call_as("links.backlinks", json!({ "id": id }))
}
fn log_append(&mut self, task_id: &str, text: &str) -> Result<()> {
self.call("log.append", json!({ "task_id": task_id, "text": text }))
.map(|_| ())
}
fn log_tail(&self, task_id: &str, n: usize) -> Result<Vec<String>> {
self.call_as("log.tail", json!({ "task_id": task_id, "n": n }))
}
fn export(&self, dir: &std::path::Path) -> Result<usize> {
// Export runs server-side, writing under `dir` on the server's host.
let out: Value = self.call("export", json!({ "path": dir.to_string_lossy() }))?;
out.get("count")
.and_then(Value::as_u64)
.map(|n| n as usize)
.ok_or_else(|| Error::Remote("export: missing count".into()))
}
// --- sync primitives: not meaningful for a no-replica client ---
fn ops_since(&self, _after: Option<&str>) -> Result<Vec<heph_core::Op>> {
Err(Error::Remote(
"ops_since is unsupported in client mode".into(),
))
}
fn apply_op(&mut self, _op: &heph_core::Op) -> Result<bool> {
Err(Error::Remote(
"apply_op is unsupported in client mode".into(),
))
}
fn adopt_owner(&mut self, _canonical: &str) -> Result<()> {
// The server owns the data; a client has nothing local to re-own.
Ok(())
}
fn sync_state(&self, _peer: &str) -> Result<SyncCursors> {
Ok(SyncCursors::default())
}
fn record_sync(
&mut self,
_peer: &str,
_pushed: Option<&str>,
_pulled: Option<&str>,
) -> Result<()> {
Ok(())
}
fn conflicts_list(&self) -> Result<Vec<Conflict>> {
self.call_as("conflicts.list", json!({}))
}
fn conflicts_resolve(&mut self, id: &str, choice: &str) -> Result<()> {
self.call("conflicts.resolve", json!({ "id": id, "choice": choice }))
.map(|_| ())
}
}

View file

@ -13,7 +13,7 @@ use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use heph_core::{Attention, NewNode, NewTask, Store, TaskState};
use heph_core::{Attention, LinkType, NewNode, NewTask, Store, TaskState};
/// A JSON-RPC request line.
#[derive(Debug, Deserialize)]
@ -168,6 +168,13 @@ struct LinkParams {
id: String,
}
#[derive(Deserialize)]
struct AddLinkParams {
src: String,
dst: String,
link_type: LinkType,
}
#[derive(Deserialize)]
struct LogAppendParams {
task_id: String,
@ -223,6 +230,10 @@ pub fn dispatch(store: &mut dyn Store, method: &str, params: Value) -> Result<Va
let p: NewTask = parse(params)?;
json!(store.create_task(p)?)
}
"task.get" => {
let p: IdParam = parse(params)?;
json!(store.get_task(&p.id)?)
}
"task.set_state" => {
let p: SetStateParams = parse(params)?;
json!(store.set_task_state(&p.id, p.state)?)
@ -252,6 +263,10 @@ pub fn dispatch(store: &mut dyn Store, method: &str, params: Value) -> Result<Va
let p: JournalParams = parse(params)?;
json!(store.journal_open_or_create(&p.date)?)
}
"links.add" => {
let p: AddLinkParams = parse(params)?;
json!(store.add_link(&p.src, &p.dst, p.link_type)?)
}
"links.outgoing" => {
let p: LinkParams = parse(params)?;
json!(store.outgoing_links(&p.id)?)

View file

@ -16,28 +16,28 @@ use serde_json::{json, Value};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use heph_core::{LocalStore, Store};
use heph_core::Store;
use crate::rpc::{self, Request, Response, RpcError, INTERNAL_ERROR, PARSE_ERROR};
use crate::sync;
use crate::sync::{self, SharedStore};
/// The shared, cheaply-cloneable context each connection serves from.
#[derive(Clone)]
struct Ctx {
store: Arc<Mutex<LocalStore>>,
store: SharedStore,
/// The hub this device syncs with, if it is a spoke (`local` + `hub_url`).
hub_url: Option<String>,
http: reqwest::Client,
}
/// A running daemon over a shared local store.
/// A running daemon over a shared store (any [`Store`] backend).
pub struct Daemon {
ctx: Ctx,
}
impl Daemon {
/// Wrap an opened store.
pub fn new(store: LocalStore) -> Daemon {
/// Wrap an opened store (a `LocalStore`, or a `client`-mode `RemoteStore`).
pub fn new<S: Store + Send + 'static>(store: S) -> Daemon {
Daemon {
ctx: Ctx {
store: Arc::new(Mutex::new(store)),
@ -55,7 +55,7 @@ impl Daemon {
/// The shared store handle, for code that needs to reach the same store the
/// daemon serves (the hub HTTP router and background sync, tech-spec §6.1).
pub fn store(&self) -> Arc<Mutex<LocalStore>> {
pub fn store(&self) -> SharedStore {
self.ctx.store.clone()
}

View file

@ -7,6 +7,8 @@
//!
//! - `POST /sync/push` — the spoke sends its new ops; the hub merges them.
//! - `GET /sync/pull?after=<hlc>` — the hub returns ops past the spoke's cursor.
//! - `POST /rpc` — the full daemon API ([`crate::rpc::dispatch`]) over HTTP, for
//! a no-replica `client`-mode [`crate::remote::RemoteStore`] to proxy against.
//!
//! Exchange is **incremental by HLC cursor** (`sync_state`, [`heph_core::SyncCursors`]):
//! each side transfers only the tail it hasn't sent/seen. Merge is idempotent,
@ -22,11 +24,15 @@ use axum::http::StatusCode;
use axum::routing::{get, post};
use axum::{Json, Router};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use heph_core::{LocalStore, Op, Store};
use heph_core::{Op, Store};
/// The shared store handle a hub serves from.
type SharedStore = Arc<Mutex<LocalStore>>;
use crate::rpc::{self, Response, RpcError, INTERNAL_ERROR};
/// The shared store a hub serves from — any [`Store`], so a `server` fronts a
/// `LocalStore` and (later modes) could front another backend.
pub type SharedStore = Arc<Mutex<dyn Store + Send>>;
/// A batch of ops in flight (push body / pull response).
#[derive(Debug, Serialize, Deserialize)]
@ -50,13 +56,13 @@ pub struct SyncReport {
/// an async worker, tech-spec §3).
async fn with_store<T, F>(store: &SharedStore, f: F) -> Result<T>
where
F: FnOnce(&mut LocalStore) -> heph_core::Result<T> + Send + 'static,
F: FnOnce(&mut (dyn Store + Send)) -> heph_core::Result<T> + Send + 'static,
T: Send + 'static,
{
let store = store.clone();
let out = tokio::task::spawn_blocking(move || {
let mut guard = store.lock().expect("store mutex poisoned");
f(&mut guard)
f(&mut *guard)
})
.await?;
Ok(out?)
@ -65,7 +71,7 @@ where
/// Apply a batch of ops in HLC order, returning how many were newly applied and
/// the highest HLC seen (the new cursor position).
fn apply_batch(
store: &mut LocalStore,
store: &mut (dyn Store + Send),
mut ops: Vec<Op>,
) -> heph_core::Result<(usize, Option<String>)> {
ops.sort_by(|a, b| a.hlc.cmp(&b.hlc));
@ -85,9 +91,42 @@ pub fn router(store: SharedStore) -> Router {
Router::new()
.route("/sync/pull", get(pull))
.route("/sync/push", post(push))
.route("/rpc", post(rpc_call))
.with_state(store)
}
/// One `POST /rpc` call: a method name + params, mirroring the unix-socket RPC.
#[derive(Debug, Deserialize)]
struct RpcCall {
method: String,
#[serde(default)]
params: Value,
}
/// `POST /rpc` — run one [`rpc::dispatch`] call on the hub's store and return a
/// JSON-RPC-shaped [`Response`] (result xor error). Always HTTP 200; method
/// failures travel in the body so the client can reconstruct the error.
async fn rpc_call(State(store): State<SharedStore>, Json(call): Json<RpcCall>) -> Json<Response> {
let store = store.clone();
let dispatched = tokio::task::spawn_blocking(move || {
let mut guard = store.lock().expect("store mutex poisoned");
rpc::dispatch(&mut *guard, &call.method, call.params)
})
.await;
let response = match dispatched {
Ok(Ok(value)) => Response::ok(Value::Null, value),
Ok(Err(rpc_err)) => Response::failed(Value::Null, rpc_err),
Err(join_err) => Response::failed(
Value::Null,
RpcError {
code: INTERNAL_ERROR,
message: format!("dispatch task failed: {join_err}"),
},
),
};
Json(response)
}
#[derive(Debug, Deserialize)]
struct PullQuery {
/// HLC cursor — return ops strictly newer than this (absent ⇒ from the start).

View file

@ -0,0 +1,97 @@
//! Client mode over real HTTP (tech-spec §3.1, slice 9b). A server runs the hub
//! router (which includes `/rpc`) over a temp `LocalStore`; a `RemoteStore`
//! proxies the `Store` API to it and we assert the calls land on the server's
//! store. The server runs on its own runtime thread, so the test thread can use
//! the blocking client without nesting runtimes.
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use heph_core::{Attention, Error, FixedClock, LocalStore, NewNode, NewTask, Store};
use hephd::sync::{self, SharedStore};
use hephd::RemoteStore;
const NOW: i64 = 1_704_067_200_000; // 2024-01-01T00:00:00Z
/// Start the hub router over a temp `LocalStore` on an ephemeral port; return
/// its base URL. The server thread + temp dir live for the test's duration.
fn start_server() -> String {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
let dir = tempfile::tempdir().unwrap();
let store =
LocalStore::open(dir.path().join("heph.db"), Box::new(FixedClock(NOW))).unwrap();
let shared: SharedStore = Arc::new(Mutex::new(store));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
tx.send(listener.local_addr().unwrap()).unwrap();
let _keep = dir; // keep the temp DB alive while we serve
axum::serve(listener, sync::router(shared)).await.unwrap();
});
});
let addr = rx.recv_timeout(Duration::from_secs(5)).unwrap();
format!("http://{addr}")
}
#[test]
fn remote_store_proxies_the_store_api() {
let base = start_server();
let mut remote = RemoteStore::new(&base);
// Create + read a node round-trips through HTTP.
let node = remote
.create_node(NewNode::doc("Roof", "shingles need work"))
.unwrap();
let got = remote.get_node(&node.id).unwrap().expect("node on server");
assert_eq!(got.title, "Roof");
assert_eq!(got.body.as_deref(), Some("shingles need work"));
// A missing node is Ok(None), not an error.
assert!(remote.get_node("does-not-exist").unwrap().is_none());
// A body update materializes server-side and is full-text searchable.
remote
.update_node(&node.id, None, Some("new cedar shingles".into()))
.unwrap();
let hits = remote.search("cedar").unwrap();
assert!(hits.iter().any(|h| h.id == node.id), "search missed update");
// Tasks proxy too, and land in the Organizational list.
let task = remote
.create_task(NewTask {
title: "Renew passport".into(),
attention: Some(Attention::Red),
..Default::default()
})
.unwrap();
let fetched = remote
.get_task(&task.node_id)
.unwrap()
.expect("task on server");
assert_eq!(fetched.node_id, task.node_id);
let listed = remote.list(None, None, true).unwrap();
assert!(
listed.iter().any(|t| t.node_id == task.node_id),
"task missing from list"
);
// Read-only aggregates proxy and start empty.
assert!(remote.health().unwrap().active_count >= 1);
assert!(remote.conflicts_list().unwrap().is_empty());
}
#[test]
fn remote_store_preserves_not_found() {
let base = start_server();
let mut remote = RemoteStore::new(&base);
let err = remote
.update_node("missing", Some("x".into()), None)
.unwrap_err();
assert!(matches!(err, Error::NodeNotFound(_)), "got {err:?}");
}

View file

@ -10,4 +10,5 @@ Begin the v1 prototype (Phase 1, tech-spec §11.1), built in TDD slices:
- Sync engine, local-only (§12): real hybrid logical clock + persistent device `origin`; an append-only op-log per mutation; an idempotent, order-independent merge/apply engine — last-writer-wins task scalars (discards surfaced in a `conflicts` queue), OR-set links, monotonic tombstones. Two-replica convergence proven.
- Body text CRDT (§5, §12, slice 8d): node bodies now merge through the `yrs` text CRDT (`body_crdt`) instead of last-writer-wins — whole-buffer writes are diffed into the doc and the yrs delta rides the op, so concurrent edits to different regions both survive and never enqueue a conflict.
- Network sync over HTTP (§6.1, §12, slice 9a): `hephd --mode server` exposes a sync hub (`POST /sync/push`, `GET /sync/pull?after=<hlc>`, axum) over the same store; `hephd --mode local --hub-url <url>` becomes a spoke that background-syncs its op-log with that hub (and on demand via the `sync.now`/`sync.status` RPC). Exchange is incremental by HLC cursor (`sync_state`) and idempotent. The merge engine is `heph-core`'s, unchanged. Unauthenticated/single-owner for now (auth lands with OIDC). `conflicts.list`/`conflicts.resolve` are now reachable over the daemon socket.
- Client mode (§3.1, slice 9b): `hephd --mode client --server-url <url>` runs with no local replica, proxying every store call to a server's `POST /rpc` endpoint (the full daemon API over HTTP). The daemon is now backend-agnostic (`local`/`server` front a `LocalStore`, `client` a `RemoteStore`), so surfaces see the same unix-socket API in every mode.
- CI runs the Rust suite (fmt/clippy/test) via the project build hook.

View file

@ -326,7 +326,7 @@ See [[design]] §5§7 for the constraints later phases impose on present choi
## 14. Implementation status (Phase 1 tracker)
> Cross-session resume tracker for the Phase 1 C1 (branch `feature/v1-prototype`, PR #1). Updated 2026-06-01 — **100 tests green** (`cargo test --all`), `clippy -D warnings` + `fmt` + `prek` clean. Workspace: `crates/heph-core`, `crates/hephd`, `crates/heph` (no `heph.nvim/` yet).
> Cross-session resume tracker for the Phase 1 C1 (branch `feature/v1-prototype`, PR #1). Updated 2026-06-01 — **102 tests green** (`cargo test --all`), `clippy -D warnings` + `fmt` + `prek` clean. Workspace: `crates/heph-core`, `crates/hephd`, `crates/heph` (no `heph.nvim/` yet).
**Done**
@ -335,18 +335,18 @@ See [[design]] §5§7 for the constraints later phases impose on present choi
- ✅ **Recurrence (§4.4):** roll-forward in place — fresh checklist, logged occurrence, advance-skipping-misses; completion never carries forward (proptest). Per-task logs; `skip`.
- ✅ **Ranking (§7):** pure two-stage filter + reorderable named dimensions; proptest total order.
- ✅ **Daemon RPC (§6):** node.get/create/update/tombstone, task.create/set_state/set_attention/skip, next, list, health, journal.open_or_create, search, links.outgoing/backlinks, log.append/tail, export, conflicts.list/resolve, sync.now/sync.status. Line-delimited JSON-RPC over a unix socket; sync `Client`. (`ops_since`/`apply_op` are `Store` methods exchanged over the hub HTTP endpoint, not the unix socket.)
- ✅ **Runtime modes (§3.1) — `local` + `server`:** exclusive file-lock handoff via `LockGuard`; `--mode local|server`, `--hub-url`, `--http-addr`. `client` (no replica) is a later slice.
- ✅ **Runtime modes (§3.1) — `local` + `server` + `client`:** exclusive file-lock handoff via `LockGuard` (local/server only); `--mode local|server|client`, `--hub-url`, `--http-addr`, `--server-url`.
- ✅ **Sync engine (§12) minus network:** HLC (clock-injected, monotonic) + persistent device `origin`; op-log per mutation; `apply_op` merge — **LWW** task scalars + titles with a **conflict queue**, **OR-set** links, monotonic tombstones, idempotent; two-replica convergence proven. `adopt_owner` = basic §13 canonical-owner adoption.
- ✅ **Body text CRDT (§5, §12, slice 8d):** node bodies merge through the **`yrs`** text CRDT (`body_crdt` BLOB) instead of LWW. A device authors under a stable `client_id` derived from its `origin`; whole-buffer writes are diffed (common prefix/suffix, char-boundary safe) into the doc; the yrs delta rides the `node.create`/`node.set` op (`body_crdt` field) and `apply` merges it — concurrent disjoint edits both survive and never enqueue a conflict.
- ✅ **Network sync (§6.1, §12, slice 9a):** **transport ratified = `axum` HTTP/JSON.** The hub (`server` mode) exposes `POST /sync/push` + `GET /sync/pull?after=<hlc>` over the same store; a spoke (`local` + `hub_url`) runs `sync::sync_once` (pull→merge, then push) and background-syncs on a 30s interval. Incremental by HLC cursor (`sync_state`/`SyncCursors`); idempotent re-push is a no-op. Two spokes converge through a real-HTTP hub (incl. scalar conflict) in `tests/sync_http.rs`. **Unauthenticated for now, single-owner** (auth + per-user scoping is slice 10).
- ✅ **`client` mode + `RemoteStore` (§3.1, slice 9b):** a no-replica backend that proxies every `Store` call to a `server`'s `POST /rpc` (the full `dispatch`, over HTTP) via a **blocking** reqwest client — the online-only escape hatch. `Daemon` is now generic over `dyn Store + Send`, so the same unix-socket surface fronts either a `LocalStore` or a `RemoteStore`. Sync primitives are stubbed (a client has no op-log). Proven in `tests/client_mode.rs`. `dispatch` gained `task.get` + `links.add`.
- ✅ **CLI (§1):** `heph` next/task/doc/get/export/search/journal.
- ✅ **CI (§9):** `.forgejo/scripts/build` runs fmt/clippy/test (self-bootstrapping rustup).
**Not yet done (resume order)**
1. ⏳ **`client` mode + `RemoteStore` (§3.1, slice 9b):** a no-replica spoke that proxies every `Store` method to a `server` over HTTP — the online-only escape hatch (borrowed box, CI, future web backend). Open: how the proxy maps the mutating `Store` surface; `sync.now` is N/A for `client`.
2. ⏳ **OIDC/Authentik auth (§13):** device-code flow, bearer token on the hub endpoint, full per-user isolation, adoption-with-deterministic-ids.
3. ⏳ **`heph.nvim` (§8):** obsidian.nvim parity + task views; headless-nvim e2e (needs `neovim` + `plenary.nvim` on the CI runner).
1. ⏳ **OIDC/Authentik auth (§13):** device-code flow, bearer token on the hub `POST /sync/*` + `/rpc` endpoints, full per-user isolation, adoption-with-deterministic-ids.
2. ⏳ **`heph.nvim` (§8):** obsidian.nvim parity + task views; headless-nvim e2e (needs `neovim` + `plenary.nvim` on the CI runner).
## Related