From 455f172a548ecdbe83285f5e74b379ca81ac676d Mon Sep 17 00:00:00 2001 From: Erich Blume Date: Mon, 1 Jun 2026 09:06:17 -0700 Subject: [PATCH] heph-core: body text-CRDT via yrs (sync 8d) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace last-writer-wins for node bodies with the yrs text CRDT, so concurrent edits to different regions of a body merge instead of one clobbering the other (tech-spec §5, §12). - New crate::crdt module wraps yrs: a device authors under a stable client_id derived from its sync origin; a whole-buffer write is diffed (common prefix/suffix, char-boundary safe) into the doc and the yrs delta is captured; merge is commutative/idempotent. - nodes::create/update/journal maintain the body_crdt BLOB and put the yrs delta in the node.create/node.set op payload (body_crdt field). Recurrence's local checklist reset goes through the same path to keep body and body_crdt consistent (still records no op, as before). - apply::node_upsert merges the body delta through the CRDT regardless of HLC order and drops body-conflict recording; titles + task scalars stay LWW with the conflict queue. - convergence test now asserts disjoint concurrent body edits both survive and enqueue no conflict. 97 tests green; clippy -D warnings + fmt + prek clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 157 +++++++++++++++++ Cargo.toml | 1 + README.md | 6 +- crates/heph-core/Cargo.toml | 1 + crates/heph-core/src/crdt.rs | 210 +++++++++++++++++++++++ crates/heph-core/src/lib.rs | 1 + crates/heph-core/src/oplog.rs | 6 +- crates/heph-core/src/sqlite/apply.rs | 91 ++++++---- crates/heph-core/src/sqlite/nodes.rs | 138 +++++++++++---- crates/heph-core/src/sqlite/tasks.rs | 6 +- crates/heph-core/tests/convergence.rs | 29 +++- docs/changelog.d/v1-prototype.feature.md | 2 + docs/reference/tech-spec.md | 14 +- 13 files changed, 576 insertions(+), 86 deletions(-) create mode 100644 crates/heph-core/src/crdt.rs diff --git a/Cargo.lock b/Cargo.lock index ddad7a8..2241351 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,6 +88,37 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "arc-swap" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a3a1fd6f75306b68087b831f025c712524bcb19aad54e557b1129cfa0a2b207" +dependencies = [ + "rustversion", +] + +[[package]] +name = "async-lock" +version = "3.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.5.1" @@ -224,12 +255,41 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] +name = "dashmap" +version = "6.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6361d5c062261c78a176addb82d4c821ae42bed6089de0e12603cd25de2059c" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "errno" version = "0.3.14" @@ -240,6 +300,27 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -257,6 +338,9 @@ name = "fastrand" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" +dependencies = [ + "getrandom", +] [[package]] name = "find-msvc-tools" @@ -311,9 +395,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi", "wasip2", + "wasm-bindgen", ] [[package]] @@ -367,6 +453,7 @@ dependencies = [ "tempfile", "thiserror 2.0.18", "ulid", + "yrs", ] [[package]] @@ -469,6 +556,15 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.30" @@ -531,6 +627,25 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + [[package]] name = "parse-zoneinfo" version = "0.3.1" @@ -712,6 +827,15 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.12.3" @@ -813,6 +937,12 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.228" @@ -883,6 +1013,15 @@ version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" +[[package]] +name = "smallstr" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "862077b1e764f04c251fe82a2ef562fd78d7cadaeb072ca7c2bcaf7217b1ff3b" +dependencies = [ + "smallvec", +] + [[package]] name = "smallvec" version = "1.15.1" @@ -1352,6 +1491,24 @@ version = "0.57.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" +[[package]] +name = "yrs" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89512f2d869f9947e1c58d57ef86c8f4ca1b1e8ccf24d6e1ff8c7cdbd67d54df" +dependencies = [ + "arc-swap", + "async-lock", + "async-trait", + "dashmap", + "fastrand", + "serde", + "serde_json", + "smallstr", + "smallvec", + "thiserror 2.0.18", +] + [[package]] name = "zerocopy" version = "0.8.50" diff --git a/Cargo.toml b/Cargo.toml index d3a6d85..05610a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ thiserror = "2" anyhow = "1" pulldown-cmark = { version = "0.13", default-features = false } rrule = "0.13" +yrs = "0.26" chrono = { version = "0.4", default-features = false, features = ["clock"] } serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/README.md b/README.md index 3ca86c6..51f4dfd 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ See **[docs/explanation/design.md](docs/explanation/design.md)** for the vision ## Status -**Phase 1 (v1 prototype) — in progress** on branch `feature/v1-prototype`. The **local-only system is feature-complete and the offline-sync merge engine converges**; remaining work is the network transport, auth, and the Neovim plugin. Built test-first (102 tests at last update). The canonical tracker is **tech-spec §14**. +**Phase 1 (v1 prototype) — in progress** on branch `feature/v1-prototype`. The **local-only system is feature-complete and the offline-sync merge engine converges** (including a `yrs` text-CRDT for bodies); remaining work is the network transport, auth, and the Neovim plugin. Built test-first (97 tests at last update). The canonical tracker is **tech-spec §14**. | Area | State | |---|---| @@ -19,8 +19,8 @@ See **[docs/explanation/design.md](docs/explanation/design.md)** for the vision | `hephd` daemon — **local mode** (file lock + JSON-RPC over a unix socket) | ✅ done | | `heph` CLI; `list` / `health` / `journal` / full-text `search` (FTS5) | ✅ done | | Sync engine — HLC, op-log, converging merge + conflict queue (no network yet) | ✅ done | -| yrs text-CRDT for body merge | ⏳ next | -| `server`/`client` modes + network push/pull sync | ⏳ | +| yrs text-CRDT for body merge | ✅ done | +| `server`/`client` modes + network push/pull sync | ⏳ next | | OIDC/Authentik auth + per-user isolation | ⏳ | | `heph.nvim` (primary surface) | ⏳ | diff --git a/crates/heph-core/Cargo.toml b/crates/heph-core/Cargo.toml index 6701f42..0f0763f 100644 --- a/crates/heph-core/Cargo.toml +++ b/crates/heph-core/Cargo.toml @@ -14,6 +14,7 @@ ulid.workspace = true thiserror.workspace = true pulldown-cmark.workspace = true rrule.workspace = true +yrs.workspace = true chrono.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/crates/heph-core/src/crdt.rs b/crates/heph-core/src/crdt.rs new file mode 100644 index 0000000..eb47e2f --- /dev/null +++ b/crates/heph-core/src/crdt.rs @@ -0,0 +1,210 @@ +//! Body text-CRDT helpers (tech-spec §5, §12). +//! +//! A node's body merges as a [`yrs`] text CRDT. The `nodes.body_crdt` BLOB is +//! the encoded CRDT state; the `nodes.body` TEXT column is its materialized +//! view. Surfaces only ever send whole-buffer text, so a write is **diffed** +//! into the CRDT (common prefix/suffix) and the resulting yrs update travels in +//! the op payload — peers then *merge* concurrent edits instead of clobbering +//! the loser with last-writer-wins. +//! +//! Offsets are bytes ([`OffsetKind::Bytes`]); the diff aligns its cut points to +//! UTF-8 char boundaries so multibyte text is never split mid-codepoint. Each +//! device authors under a stable [`client_id`] derived from its sync `origin`, +//! so its successive edits extend one yrs client sequence (the Yjs model of one +//! client per actor) rather than minting a fresh client per write. + +use yrs::updates::decoder::Decode; +use yrs::{ + ClientID, Doc, GetString, OffsetKind, Options, ReadTxn, StateVector, Text, Transact, Update, +}; + +/// The shared text field name inside every body doc. +const FIELD: &str = "body"; + +/// A stable yrs `client_id` for a device, derived from its sync `origin` via +/// FNV-1a so a device's edits always extend the same client sequence. yrs +/// restricts client ids to 53 bits (JS-safe-integer range), so we fold the +/// hash into that range and keep it non-zero. +pub(crate) fn client_id(origin: &str) -> u64 { + let mut hash: u64 = 0xcbf2_9ce4_8422_2325; + for b in origin.as_bytes() { + hash ^= *b as u64; + hash = hash.wrapping_mul(0x0000_0100_0000_01b3); + } + (hash & ((1u64 << 53) - 1)) | 1 // 53-bit, never 0 +} + +/// Build a byte-offset doc, optionally seeded from a stored state blob. +fn load(client: u64, state: Option<&[u8]>) -> Doc { + let doc = Doc::with_options(Options { + client_id: ClientID::new(client), + offset_kind: OffsetKind::Bytes, + ..Default::default() + }); + if let Some(bytes) = state { + if let Ok(update) = Update::decode_v1(bytes) { + let mut txn = doc.transact_mut(); + let _ = txn.apply_update(update); + } + } + doc +} + +/// Encode the whole doc state for persistence in `body_crdt`. +fn encode_state(doc: &Doc) -> Vec { + doc.transact() + .encode_state_as_update_v1(&StateVector::default()) +} + +/// Materialize the body text. +fn materialize(doc: &Doc) -> String { + let text = doc.get_or_insert_text(FIELD); + text.get_string(&doc.transact()) +} + +/// The outcome of writing a whole body into the CRDT. +pub(crate) struct BodyWrite { + /// New encoded CRDT state to persist in `body_crdt`. + pub state: Vec, + /// The yrs update describing just this write — travels in the op payload. + pub delta: Vec, + /// The materialized body text after the write. + pub body: String, +} + +/// Diff `new_body` into the CRDT seeded from `prev_state`, authoring under +/// `client`. Returns the new state, the delta update for peers, and the +/// materialized text. +pub(crate) fn write_body(client: u64, prev_state: Option<&[u8]>, new_body: &str) -> BodyWrite { + let doc = load(client, prev_state); + let before = doc.transact().state_vector(); + { + let text = doc.get_or_insert_text(FIELD); + let mut txn = doc.transact_mut(); + let cur = text.get_string(&txn); + let (start, del, ins) = diff(&cur, new_body); + if del > 0 { + text.remove_range(&mut txn, start as u32, del as u32); + } + if !ins.is_empty() { + text.insert(&mut txn, start as u32, ins); + } + } + let delta = doc.transact().encode_state_as_update_v1(&before); + BodyWrite { + state: encode_state(&doc), + delta, + body: materialize(&doc), + } +} + +/// The outcome of merging a peer's delta into the CRDT. +pub(crate) struct BodyMerge { + /// New encoded CRDT state to persist in `body_crdt`. + pub state: Vec, + /// The materialized body text after the merge. + pub body: String, +} + +/// Merge a peer's `delta` update into the CRDT seeded from `prev_state`. The +/// merging doc never authors, so its `client_id` is irrelevant. Commutative and +/// idempotent — applying the same delta twice is a no-op. +pub(crate) fn merge_body(prev_state: Option<&[u8]>, delta: &[u8]) -> BodyMerge { + let doc = load(0, prev_state); + if let Ok(update) = Update::decode_v1(delta) { + let mut txn = doc.transact_mut(); + let _ = txn.apply_update(update); + } + BodyMerge { + state: encode_state(&doc), + body: materialize(&doc), + } +} + +/// Materialize a stored CRDT state blob to its body text. +#[cfg(test)] +pub(crate) fn body_of(state: &[u8]) -> String { + materialize(&load(0, Some(state))) +} + +/// Common prefix/suffix diff over byte indices, cut points aligned to UTF-8 +/// char boundaries. Returns `(start, delete_len, inserted)` such that replacing +/// `cur[start..start+delete_len]` with `inserted` yields `new`. +fn diff<'a>(cur: &str, new: &'a str) -> (usize, usize, &'a str) { + let (cb, nb) = (cur.as_bytes(), new.as_bytes()); + + // Longest common prefix (in bytes); cur and new agree below `start`, so a + // char boundary there is the same in both. + let mut start = 0; + let max_pre = cb.len().min(nb.len()); + while start < max_pre && cb[start] == nb[start] { + start += 1; + } + while start > 0 && !cur.is_char_boundary(start) { + start -= 1; + } + + // Longest common suffix not overlapping the prefix. + let (mut ec, mut en) = (cb.len(), nb.len()); + while ec > start && en > start && cb[ec - 1] == nb[en - 1] { + ec -= 1; + en -= 1; + } + // If a cut landed mid-codepoint, extend the changed span forward to the + // next boundary in both strings. + while ec < cb.len() && (!cur.is_char_boundary(ec) || !new.is_char_boundary(en)) { + ec += 1; + en += 1; + } + + (start, ec - start, &new[start..en]) +} + +#[cfg(test)] +mod tests { + use super::*; + + const A: u64 = 0xaaaa; + const B: u64 = 0xbbbb; + + #[test] + fn write_then_materialize_round_trips() { + let w = write_body(A, None, "Hello world"); + assert_eq!(w.body, "Hello world"); + assert_eq!(body_of(&w.state), "Hello world"); + } + + #[test] + fn disjoint_concurrent_edits_merge() { + // A and B share a base, then edit different regions offline. + let base = write_body(A, None, "Hello world"); + let on_b = merge_body(None, &base.delta); // B receives the create + + let edit_a = write_body(A, Some(&base.state), "Hello brave world"); + let edit_b = write_body(B, Some(&on_b.state), "Hello world!"); + + // Each side merges the other's delta; both converge with both edits. + let a_final = merge_body(Some(&edit_a.state), &edit_b.delta); + let b_final = merge_body(Some(&edit_b.state), &edit_a.delta); + assert_eq!(a_final.body, b_final.body, "bodies did not converge"); + assert_eq!(a_final.body, "Hello brave world!"); + } + + #[test] + fn merge_is_idempotent() { + let base = write_body(A, None, "x"); + let edit = write_body(A, Some(&base.state), "xy"); + let once = merge_body(Some(&base.state), &edit.delta); + let twice = merge_body(Some(&once.state), &edit.delta); + assert_eq!(once.body, "xy"); + assert_eq!(twice.body, "xy"); + } + + #[test] + fn multibyte_edit_is_not_split() { + let base = write_body(A, None, "café"); + let edit = write_body(A, Some(&base.state), "café au lait"); + assert_eq!(edit.body, "café au lait"); + assert_eq!(body_of(&edit.state), "café au lait"); + } +} diff --git a/crates/heph-core/src/lib.rs b/crates/heph-core/src/lib.rs index 9124ac5..90f71ac 100644 --- a/crates/heph-core/src/lib.rs +++ b/crates/heph-core/src/lib.rs @@ -9,6 +9,7 @@ //! deterministic. pub mod clock; +mod crdt; pub mod error; pub mod export; pub mod extract; diff --git a/crates/heph-core/src/oplog.rs b/crates/heph-core/src/oplog.rs index 926be35..466e62d 100644 --- a/crates/heph-core/src/oplog.rs +++ b/crates/heph-core/src/oplog.rs @@ -11,9 +11,11 @@ use serde_json::Value; /// Op type discriminators (the `op_type` column). pub mod op_type { - /// A node was created. Payload: `{kind, title, body, created_at}`. + /// A node was created. Payload: `{kind, title, body, created_at}`, plus + /// `body_crdt` (the yrs CRDT seed) when the node has a body. pub const NODE_CREATE: &str = "node.create"; - /// A node's title/body was set (LWW). Payload: `{title, body}`. + /// A node's title/body was set. Payload: `{title, body}`; a body change also + /// carries `body_crdt` (the yrs delta, merged — not LWW — on apply). pub const NODE_SET: &str = "node.set"; /// A node was tombstoned. Payload: `{}`. pub const NODE_TOMBSTONE: &str = "node.tombstone"; diff --git a/crates/heph-core/src/sqlite/apply.rs b/crates/heph-core/src/sqlite/apply.rs index b3be5ff..980679e 100644 --- a/crates/heph-core/src/sqlite/apply.rs +++ b/crates/heph-core/src/sqlite/apply.rs @@ -3,9 +3,11 @@ //! A peer's ops arrive in HLC order; [`apply`] replays each one idempotently //! with the merge rules: //! -//! - **node bodies / titles, task scalars:** last-writer-wins by HLC. A -//! *discarded* value from a different device is recorded in `conflicts` -//! (surfaced, not silently dropped). +//! - **node bodies:** merged through the **yrs text CRDT** (`body_crdt`) — +//! concurrent edits always merge, never a hard conflict. +//! - **node titles, task scalars:** last-writer-wins by HLC. A *discarded* +//! scalar value from a different device is recorded in `conflicts` (surfaced, +//! not silently dropped). //! - **links:** OR-set add/remove keyed by the link's own id → no conflicts. //! - **tombstones:** monotonic — once set, they stay. //! @@ -16,6 +18,7 @@ use rusqlite::{Connection, OptionalExtension}; use serde_json::Value; use super::{absorb_remote_hlc, new_id, nodes, ops}; +use crate::crdt; use crate::error::Result; use crate::hlc::Hlc; use crate::model::Conflict; @@ -120,55 +123,81 @@ fn cross_origin(op: &Op, current_hlc: &str) -> bool { .unwrap_or(false) } -/// Create a node (if absent) or LWW its title/body. +/// The yrs body delta carried by a node op, if any. +fn body_crdt_field(p: &Value) -> Option> { + p.get("body_crdt") + .and_then(|v| serde_json::from_value::>(v.clone()).ok()) +} + +/// Create a node (if absent), or merge its body via the text CRDT and LWW its +/// title. The body **never** produces a conflict — concurrent edits merge. fn node_upsert(tx: &Connection, op: &Op) -> Result<()> { let p = &op.payload; match nodes::get(tx, &op.target_id)? { None => { let kind = str_field(p, "kind").unwrap_or("doc"); let title = str_field(p, "title").unwrap_or(""); - let body = str_field(p, "body"); let created_at = i64_field(p, "created_at").unwrap_or_else(|| op_physical(op)); + // Seed our CRDT from the peer's, so we share its yrs client + // sequence rather than minting our own and duplicating the text. + let (body, body_crdt) = match body_crdt_field(p) { + Some(delta) => { + let m = crdt::merge_body(None, &delta); + (Some(m.body), Some(m.state)) + } + None => (str_field(p, "body").map(str::to_string), None), + }; tx.execute( "INSERT INTO nodes - (id, owner_id, kind, title, body, created_at, modified_at, hlc, tombstoned) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6, ?7, 0)", + (id, owner_id, kind, title, body, body_crdt, created_at, modified_at, hlc, tombstoned) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?7, ?8, 0)", ( &op.target_id, &op.owner_id, kind, title, - body, + &body, + &body_crdt, created_at, &op.hlc, ), )?; } Some(existing) => { - let new_title = str_field(p, "title"); - let new_body = str_field(p, "body"); - // Conflict: a different device's body differs from ours. - if cross_origin(op, &existing.hlc) { - if let Some(nb) = new_body { - if Some(nb) != existing.body.as_deref() { - record_conflict( - tx, - op, - "body", - existing.body.as_deref(), - Some(nb), - &existing.hlc, - )?; - } + let op_wins = op.hlc.as_str() > existing.hlc.as_str(); + // Title is a scalar — last-writer-wins by HLC. + let title = if op_wins { + str_field(p, "title").unwrap_or(&existing.title).to_string() + } else { + existing.title.clone() + }; + let hlc = if op_wins { + op.hlc.clone() + } else { + existing.hlc + }; + let modified = if op_wins { + op_physical(op) + } else { + existing.modified_at + }; + // Body merges through the CRDT regardless of HLC order (commutative, + // idempotent); a title-only op carries no delta and leaves it. + match body_crdt_field(p) { + Some(delta) => { + let prev = nodes::get_body_crdt(tx, &op.target_id)?; + let m = crdt::merge_body(prev.as_deref(), &delta); + tx.execute( + "UPDATE nodes SET title = ?1, body = ?2, body_crdt = ?3, modified_at = ?4, hlc = ?5 WHERE id = ?6", + (&title, &m.body, &m.state, modified, &hlc, &op.target_id), + )?; + } + None => { + tx.execute( + "UPDATE nodes SET title = ?1, modified_at = ?2, hlc = ?3 WHERE id = ?4", + (&title, modified, &hlc, &op.target_id), + )?; } - } - if op.hlc.as_str() > existing.hlc.as_str() { - let title = new_title.unwrap_or(&existing.title).to_string(); - let body = new_body.map(str::to_string).or(existing.body.clone()); - tx.execute( - "UPDATE nodes SET title = ?1, body = ?2, modified_at = ?3, hlc = ?4 WHERE id = ?5", - (&title, &body, op_physical(op), &op.hlc, &op.target_id), - )?; } } } diff --git a/crates/heph-core/src/sqlite/nodes.rs b/crates/heph-core/src/sqlite/nodes.rs index fde9876..ba04907 100644 --- a/crates/heph-core/src/sqlite/nodes.rs +++ b/crates/heph-core/src/sqlite/nodes.rs @@ -4,19 +4,43 @@ use rusqlite::{Connection, OptionalExtension, Row}; use serde_json::json; -use super::{links, new_id, next_hlc, ops}; +use super::{links, meta_get, new_id, next_hlc, ops}; +use crate::crdt; use crate::error::{Error, Result}; use crate::model::{deterministic_id, NewNode, Node, NodeKind}; use crate::oplog::op_type; -/// Op payload describing a node's identity/content for `node.create`. -fn create_payload(node: &Node) -> serde_json::Value { - json!({ +/// Op payload describing a node's identity/content for `node.create`. A node +/// with a body carries the CRDT seed (`body_crdt`) so peers adopt our yrs +/// client sequence rather than minting their own and duplicating the text. +fn create_payload(node: &Node, body_crdt: Option<&[u8]>) -> serde_json::Value { + let mut p = json!({ "kind": node.kind.as_str(), "title": node.title, "body": node.body, "created_at": node.created_at, - }) + }); + if let Some(d) = body_crdt { + p["body_crdt"] = json!(d); + } + p +} + +/// This device's stable yrs `client_id`, derived from its sync `origin`. +fn device_client(conn: &Connection) -> Result { + let origin = meta_get(conn, "origin")? + .ok_or_else(|| Error::Integrity("missing device origin".into()))?; + Ok(crdt::client_id(&origin)) +} + +/// Read a node's stored CRDT body state, if any. +pub(super) fn get_body_crdt(conn: &Connection, id: &str) -> Result>> { + let v: Option>> = conn + .query_row("SELECT body_crdt FROM nodes WHERE id = ?1", [id], |r| { + r.get(0) + }) + .optional()?; + Ok(v.flatten()) } /// The `nodes` columns in a fixed order, shared by every SELECT here. @@ -47,7 +71,8 @@ pub(super) fn build( } /// Record a `node.create` op for an already-inserted node (used when a caller -/// builds + inserts a node directly, e.g. `task.create`). +/// builds + inserts a node directly, e.g. `task.create`, whose nodes carry no +/// body and so need no CRDT seed). pub(super) fn record_create(conn: &Connection, owner: &str, node: &Node) -> Result<()> { ops::record( conn, @@ -55,7 +80,7 @@ pub(super) fn record_create(conn: &Connection, owner: &str, node: &Node) -> Resu &node.hlc, op_type::NODE_CREATE, &node.id, - create_payload(node), + create_payload(node, None), ) } @@ -95,22 +120,59 @@ pub(super) fn from_row(row: &Row) -> rusqlite::Result { }) } -/// Create and persist a node, recording a `node.create` op. +/// Create and persist a node, recording a `node.create` op. A node with a body +/// is seeded into the text CRDT (`body_crdt`) and the seed travels in the op. pub(super) fn create(conn: &Connection, owner: &str, now: i64, input: NewNode) -> Result { let hlc = next_hlc(conn, now)?; - let node = build(owner, now, &hlc, input.kind, input.title, input.body); + let mut node = build(owner, now, &hlc, input.kind, input.title, input.body); + let write = match node.body.as_deref() { + Some(b) => Some(crdt::write_body(device_client(conn)?, None, b)), + None => None, + }; + if let Some(w) = &write { + node.body = Some(w.body.clone()); + } insert(conn, &node)?; + if let Some(w) = &write { + set_body_crdt(conn, &node.id, &w.state)?; + } ops::record( conn, owner, &node.hlc, op_type::NODE_CREATE, &node.id, - create_payload(&node), + create_payload(&node, write.as_ref().map(|w| w.delta.as_slice())), )?; Ok(node) } +/// Persist a node's CRDT body state. +fn set_body_crdt(conn: &Connection, id: &str, state: &[u8]) -> Result<()> { + conn.execute("UPDATE nodes SET body_crdt = ?1 WHERE id = ?2", (state, id))?; + Ok(()) +} + +/// Rewrite a node's body **locally**, diffing it into the text CRDT so `body` +/// and `body_crdt` stay consistent. Records **no** op: used by recurrence +/// roll-forward, whose checklist reset stays device-local (as it did before the +/// CRDT landed). +pub(super) fn rewrite_body_local( + conn: &Connection, + now: i64, + id: &str, + new_body: &str, +) -> Result<()> { + let prev = get_body_crdt(conn, id)?; + let write = crdt::write_body(device_client(conn)?, prev.as_deref(), new_body); + let hlc = next_hlc(conn, now)?; + conn.execute( + "UPDATE nodes SET body = ?1, body_crdt = ?2, modified_at = ?3, hlc = ?4 WHERE id = ?5", + (&write.body, &write.state, now, &hlc, id), + )?; + Ok(()) +} + /// Open today's (or `date`'s) journal node, creating it if absent. The id is /// **deterministic** in `(owner, date)` so independent offline creations /// converge (tech-spec §3.1). `date` must be an ISO `YYYY-MM-DD`. @@ -140,14 +202,16 @@ pub(super) fn open_or_create_journal( hlc: next_hlc(conn, now)?, tombstoned: false, }; + let write = crdt::write_body(device_client(conn)?, None, ""); insert(conn, &node)?; + set_body_crdt(conn, &node.id, &write.state)?; ops::record( conn, owner, &node.hlc, op_type::NODE_CREATE, &node.id, - create_payload(&node), + create_payload(&node, Some(&write.delta)), )?; Ok(node) } @@ -202,24 +266,40 @@ pub(super) fn update( let tx = conn.transaction()?; node.hlc = next_hlc(&tx, now)?; - tx.execute( - "UPDATE nodes SET title = ?1, body = ?2, modified_at = ?3, hlc = ?4 WHERE id = ?5", - ( - &node.title, - &node.body, - node.modified_at, - &node.hlc, - &node.id, - ), - )?; - ops::record( - &tx, - owner, - &node.hlc, - op_type::NODE_SET, - &node.id, - json!({ "title": node.title, "body": node.body }), - )?; + // A body change is diffed into the text CRDT; the resulting yrs delta rides + // the op so peers merge it (no last-writer-wins clobber). Title is a scalar + // and stays LWW. + let write = if body_changed { + let prev = get_body_crdt(&tx, &node.id)?; + let w = crdt::write_body( + device_client(&tx)?, + prev.as_deref(), + node.body.as_deref().unwrap_or(""), + ); + node.body = Some(w.body.clone()); + Some(w) + } else { + None + }; + match &write { + Some(w) => { + tx.execute( + "UPDATE nodes SET title = ?1, body = ?2, body_crdt = ?3, modified_at = ?4, hlc = ?5 WHERE id = ?6", + (&node.title, &node.body, &w.state, node.modified_at, &node.hlc, &node.id), + )?; + } + None => { + tx.execute( + "UPDATE nodes SET title = ?1, modified_at = ?2, hlc = ?3 WHERE id = ?4", + (&node.title, node.modified_at, &node.hlc, &node.id), + )?; + } + } + let payload = match &write { + Some(w) => json!({ "title": node.title, "body": node.body, "body_crdt": w.delta }), + None => json!({ "title": node.title, "body": node.body }), + }; + ops::record(&tx, owner, &node.hlc, op_type::NODE_SET, &node.id, payload)?; if body_changed { links::sync_wiki_links( &tx, diff --git a/crates/heph-core/src/sqlite/tasks.rs b/crates/heph-core/src/sqlite/tasks.rs index 771d47a..197c1c0 100644 --- a/crates/heph-core/src/sqlite/tasks.rs +++ b/crates/heph-core/src/sqlite/tasks.rs @@ -209,11 +209,7 @@ fn roll_forward(conn: &mut Connection, owner: &str, now: i64, task: &Task) -> Re let body = doc.body.unwrap_or_default(); let reset = recurrence::reset_checkboxes(&body); if reset != body { - let hlc = next_hlc(&tx, now)?; - tx.execute( - "UPDATE nodes SET body = ?1, modified_at = ?2, hlc = ?3 WHERE id = ?4", - (&reset, now, hlc, &doc_id), - )?; + nodes::rewrite_body_local(&tx, now, &doc_id, &reset)?; links::sync_wiki_links(&tx, owner, &doc_id, &reset, now)?; } } diff --git a/crates/heph-core/tests/convergence.rs b/crates/heph-core/tests/convergence.rs index a445771..aa50b64 100644 --- a/crates/heph-core/tests/convergence.rs +++ b/crates/heph-core/tests/convergence.rs @@ -1,7 +1,8 @@ -//! Op-log + merge convergence (tech-spec §12, slice 8b/8c). Two local replicas -//! (distinct origins) exchange ops; we assert they converge and that ambiguous -//! merges surface as conflicts. Body merge here is LWW; the yrs text CRDT lands -//! in a follow-up. No network yet — we hand ops across directly. +//! Op-log + merge convergence (tech-spec §12, slice 8b–8d). Two local replicas +//! (distinct origins) exchange ops; we assert they converge, that ambiguous +//! scalar merges surface as conflicts, and that concurrent body edits *merge* +//! through the yrs text CRDT instead of clobbering with last-writer-wins. No +//! network yet — we hand ops across directly. use heph_core::{Attention, Clock, LocalStore, NewNode, NewTask, Op, Store, TaskState}; use std::sync::atomic::{AtomicI64, Ordering}; @@ -127,17 +128,23 @@ fn offline_divergent_scalar_edits_converge_with_a_conflict() { } #[test] -fn concurrent_body_edits_converge_lww() { +fn concurrent_body_edits_merge_via_crdt() { + // Both replicas edit *different regions* of a shared body offline. The text + // CRDT merges both edits — neither is lost to last-writer-wins, and no + // conflict is enqueued. let (mut a, ca) = replica(1000); let (mut b, cb) = replica(1000); - let n = a.create_node(NewNode::doc("Note", "base")).unwrap(); + let n = a.create_node(NewNode::doc("Note", "Hello world")).unwrap(); sync_one_way(&a, &mut b, None); + // A inserts in the middle; B appends at the end. ca.set(2000); - a.update_node(&n.id, None, Some("A's edit".into())).unwrap(); + a.update_node(&n.id, None, Some("Hello brave world".into())) + .unwrap(); cb.set(2500); - b.update_node(&n.id, None, Some("B's edit".into())).unwrap(); + b.update_node(&n.id, None, Some("Hello world!".into())) + .unwrap(); sync_one_way(&a, &mut b, None); sync_one_way(&b, &mut a, None); @@ -145,7 +152,11 @@ fn concurrent_body_edits_converge_lww() { let ba = a.get_node(&n.id).unwrap().unwrap().body; let bb = b.get_node(&n.id).unwrap().unwrap().body; assert_eq!(ba, bb, "bodies did not converge"); - assert_eq!(ba.as_deref(), Some("B's edit")); // later HLC wins + assert_eq!(ba.as_deref(), Some("Hello brave world!")); // both edits survive + assert!( + a.conflicts_list().unwrap().is_empty() && b.conflicts_list().unwrap().is_empty(), + "body merges must not enqueue conflicts" + ); } #[test] diff --git a/docs/changelog.d/v1-prototype.feature.md b/docs/changelog.d/v1-prototype.feature.md index f1275ba..2ecef02 100644 --- a/docs/changelog.d/v1-prototype.feature.md +++ b/docs/changelog.d/v1-prototype.feature.md @@ -7,4 +7,6 @@ Begin the v1 prototype (Phase 1, tech-spec §11.1), built in TDD slices: - Recurrence — roll-forward in place (§4.4): completing a recurring task resets its checklist to all-unchecked, logs the occurrence, and advances the do-date to the next RRULE instance after now (skipping misses) — completion never carries forward (proptest-checked). Per-task append-only logs (`log-of`) with `log.append`/`log.tail`; `skip` advances without logging. - `hephd` daemon, local mode (§3, §6): exclusive file lock (handoff-ready), line-delimited JSON-RPC over a unix socket exposing the node/task/next/links/log methods, with DB work on tokio's blocking pool. Synchronous client for surfaces/CLI. Model types are serde-serializable. - `heph` CLI (§1) — a thin client of the daemon: `next`, `task`, `doc`, `get`, `export`. Export materializes the store to a `/.md` tree with YAML frontmatter + body (§5), one-way, tombstones excluded. +- Sync engine, local-only (§12): real hybrid logical clock + persistent device `origin`; an append-only op-log per mutation; an idempotent, order-independent merge/apply engine — last-writer-wins task scalars (discards surfaced in a `conflicts` queue), OR-set links, monotonic tombstones. Two-replica convergence proven. +- Body text CRDT (§5, §12, slice 8d): node bodies now merge through the `yrs` text CRDT (`body_crdt`) instead of last-writer-wins — whole-buffer writes are diffed into the doc and the yrs delta rides the op, so concurrent edits to different regions both survive and never enqueue a conflict. - CI runs the Rust suite (fmt/clippy/test) via the project build hook. diff --git a/docs/reference/tech-spec.md b/docs/reference/tech-spec.md index e6e7e32..f696f70 100644 --- a/docs/reference/tech-spec.md +++ b/docs/reference/tech-spec.md @@ -1,6 +1,6 @@ --- title: Technical Specification -modified: 2026-05-31 +modified: 2026-06-01 tags: - reference - design @@ -326,7 +326,7 @@ See [[design]] §5–§7 for the constraints later phases impose on present choi ## 14. Implementation status (Phase 1 tracker) -> Cross-session resume tracker for the Phase 1 C1 (branch `feature/v1-prototype`, PR #1). Updated 2026-06-01 — **102 tests green** (`cargo test --all`), `clippy -D warnings` + `fmt` + `prek` clean. Workspace: `crates/heph-core`, `crates/hephd`, `crates/heph` (no `heph.nvim/` yet). +> Cross-session resume tracker for the Phase 1 C1 (branch `feature/v1-prototype`, PR #1). Updated 2026-06-01 — **97 tests green** (`cargo test --all`), `clippy -D warnings` + `fmt` + `prek` clean. Workspace: `crates/heph-core`, `crates/hephd`, `crates/heph` (no `heph.nvim/` yet). **Done** @@ -336,16 +336,16 @@ See [[design]] §5–§7 for the constraints later phases impose on present choi - ✅ **Ranking (§7):** pure two-stage filter + reorderable named dimensions; proptest total order. - ✅ **Daemon RPC (§6) — local subset:** node.get/create/update/tombstone, task.create/set_state/set_attention/skip, next, list, health, journal.open_or_create, search, links.outgoing/backlinks, log.append/tail, export, conflicts.list/resolve, sync (ops_since/apply_op). Line-delimited JSON-RPC over a unix socket; sync `Client`. - ✅ **Runtime modes (§3.1) — `local` only:** exclusive file-lock handoff via `LockGuard`. -- ✅ **Sync engine (§12) minus network:** HLC (clock-injected, monotonic) + persistent device `origin`; op-log per mutation; `apply_op` merge — **LWW** scalars/bodies with a **conflict queue**, **OR-set** links, monotonic tombstones, idempotent; two-replica convergence proven. `adopt_owner` = basic §13 canonical-owner adoption. +- ✅ **Sync engine (§12) minus network:** HLC (clock-injected, monotonic) + persistent device `origin`; op-log per mutation; `apply_op` merge — **LWW** task scalars + titles with a **conflict queue**, **OR-set** links, monotonic tombstones, idempotent; two-replica convergence proven. `adopt_owner` = basic §13 canonical-owner adoption. +- ✅ **Body text CRDT (§5, §12, slice 8d):** node bodies merge through the **`yrs`** text CRDT (`body_crdt` BLOB) instead of LWW. A device authors under a stable `client_id` derived from its `origin`; whole-buffer writes are diffed (common prefix/suffix, char-boundary safe) into the doc; the yrs delta rides the `node.create`/`node.set` op (`body_crdt` field) and `apply` merges it — concurrent disjoint edits both survive and never enqueue a conflict. - ✅ **CLI (§1):** `heph` next/task/doc/get/export/search/journal. - ✅ **CI (§9):** `.forgejo/scripts/build` runs fmt/clippy/test (self-bootstrapping rustup). **Not yet done (resume order)** -1. ⏳ **Body CRDT (§5, §12):** replace body LWW with the **yrs** text CRDT — `body_crdt` BLOB, diff whole-body writes into the yrs doc, merge yrs updates in `apply`. (CRDT lib ratified = `yrs`.) -2. ⏳ **`server`/`client` modes + network sync (§3.1, §6.1, §12):** `RemoteStore`; hub network endpoint; push/pull by HLC cursor (the merge logic already exists); background sync; `sync.now`/`sync.status` RPC; multi-replica-over-real-sockets tests. Open: transport (`axum` HTTP/JSON vs gRPC), propagation cadence, device-id/hub registration. -3. ⏳ **OIDC/Authentik auth (§13):** device-code flow, bearer token on the hub endpoint, full per-user isolation, adoption-with-deterministic-ids. -4. ⏳ **`heph.nvim` (§8):** obsidian.nvim parity + task views; headless-nvim e2e (needs `neovim` + `plenary.nvim` on the CI runner). +1. ⏳ **`server`/`client` modes + network sync (§3.1, §6.1, §12):** `RemoteStore`; hub network endpoint; push/pull by HLC cursor (the merge logic already exists); background sync; `sync.now`/`sync.status` RPC; multi-replica-over-real-sockets tests. Open: transport (`axum` HTTP/JSON vs gRPC), propagation cadence, device-id/hub registration. +2. ⏳ **OIDC/Authentik auth (§13):** device-code flow, bearer token on the hub endpoint, full per-user isolation, adoption-with-deterministic-ids. +3. ⏳ **`heph.nvim` (§8):** obsidian.nvim parity + task views; headless-nvim e2e (needs `neovim` + `plenary.nvim` on the CI runner). ## Related