diff --git a/Cargo.lock b/Cargo.lock index 35fc93e..ddad7a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -363,6 +363,7 @@ dependencies = [ "rrule", "rusqlite", "serde", + "serde_json", "tempfile", "thiserror 2.0.18", "ulid", diff --git a/crates/heph-core/Cargo.toml b/crates/heph-core/Cargo.toml index 718de84..6701f42 100644 --- a/crates/heph-core/Cargo.toml +++ b/crates/heph-core/Cargo.toml @@ -16,6 +16,7 @@ pulldown-cmark.workspace = true rrule.workspace = true chrono.workspace = true serde.workspace = true +serde_json.workspace = true [dev-dependencies] proptest = "1" diff --git a/crates/heph-core/src/lib.rs b/crates/heph-core/src/lib.rs index dd673f3..9124ac5 100644 --- a/crates/heph-core/src/lib.rs +++ b/crates/heph-core/src/lib.rs @@ -14,6 +14,7 @@ pub mod export; pub mod extract; pub mod hlc; pub mod model; +pub mod oplog; pub mod ranking; pub mod recurrence; pub mod sqlite; @@ -25,9 +26,10 @@ pub use export::{render as render_export, ExportFile, NodeExport}; pub use extract::{extract, ContextItem, Extraction}; pub use hlc::{Hlc, HlcClock}; pub use model::{ - deterministic_id, Attention, Health, Link, LinkType, NewNode, NewTask, Node, NodeKind, Task, - TaskState, + deterministic_id, Attention, Conflict, Health, Link, LinkType, NewNode, NewTask, Node, + NodeKind, Task, TaskState, }; +pub use oplog::Op; pub use ranking::{rank, Dimension, RankedTask, RANKING}; pub use recurrence::{next_occurrence, reset_checkboxes}; pub use sqlite::LocalStore; diff --git a/crates/heph-core/src/model.rs b/crates/heph-core/src/model.rs index 728db08..879e0cf 100644 --- a/crates/heph-core/src/model.rs +++ b/crates/heph-core/src/model.rs @@ -264,6 +264,31 @@ pub struct Health { pub sync_status: String, } +/// An ambiguous merge surfaced to the user (a discarded LWW value, tech-spec +/// §12). The winning value is already in the store; this records what was +/// dropped so `heph conflicts` can show and settle it. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Conflict { + /// Conflict id. + pub id: String, + /// The node the conflict is about. + pub node_id: String, + /// Which field / region (`body`, `do_date`, `state`, …). + pub field: String, + /// The local value at merge time. + pub local_val: Option, + /// The incoming remote value. + pub remote_val: Option, + /// HLC of the local value. + pub local_hlc: String, + /// HLC of the remote value. + pub remote_hlc: String, + /// `open` or `resolved`. + pub status: String, + /// When recorded, epoch ms. + pub created_at: i64, +} + /// Deterministic id for key-unique kinds (`journal`/`tag`) so two offline /// replicas that independently create the same logical singleton converge /// (tech-spec §3.1, [[design]] §3.1). Content nodes use random ULIDs instead. diff --git a/crates/heph-core/src/oplog.rs b/crates/heph-core/src/oplog.rs new file mode 100644 index 0000000..926be35 --- /dev/null +++ b/crates/heph-core/src/oplog.rs @@ -0,0 +1,49 @@ +//! The operation log — the unit of sync (tech-spec §12). +//! +//! Every local mutation appends an append-only [`Op`] describing the change, +//! stamped with the writer's [`Hlc`](crate::hlc::Hlc). Sync exchanges ops by +//! HLC cursor; a peer applies foreign ops with the merge rules (LWW for +//! scalars, OR-set for links, CRDT for bodies). Local ops are recorded already +//! `applied` (the materialized tables were written in the same transaction). + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +/// Op type discriminators (the `op_type` column). +pub mod op_type { + /// A node was created. Payload: `{kind, title, body, created_at}`. + pub const NODE_CREATE: &str = "node.create"; + /// A node's title/body was set (LWW). Payload: `{title, body}`. + pub const NODE_SET: &str = "node.set"; + /// A node was tombstoned. Payload: `{}`. + pub const NODE_TOMBSTONE: &str = "node.tombstone"; + /// A task row was created. Payload: the task scalars. + pub const TASK_CREATE: &str = "task.create"; + /// One or more task scalars were set (LWW). Payload: the changed scalars. + pub const TASK_SET: &str = "task.set"; + /// A link was added (OR-set add). Payload: `{src, dst, type}`. + pub const LINK_ADD: &str = "link.add"; + /// A link was removed/tombstoned (OR-set remove). Payload: `{}`. + pub const LINK_REMOVE: &str = "link.remove"; +} + +/// One operation-log entry (a row of `oplog`). +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Op { + /// ULID id of the op. + pub id: String, + /// Owning user. + pub owner_id: String, + /// HLC stamp (encoded) — the causal-order key. + pub hlc: String, + /// Originating device id. + pub origin: String, + /// Op type (see [`op_type`]). + pub op_type: String, + /// The node/link this op targets. + pub target_id: String, + /// Type-specific JSON payload. + pub payload: Value, + /// Whether this op has been applied to the materialized tables. + pub applied: bool, +} diff --git a/crates/heph-core/src/sqlite/apply.rs b/crates/heph-core/src/sqlite/apply.rs new file mode 100644 index 0000000..b3be5ff --- /dev/null +++ b/crates/heph-core/src/sqlite/apply.rs @@ -0,0 +1,340 @@ +//! Applying foreign ops — the merge engine (tech-spec §12). +//! +//! A peer's ops arrive in HLC order; [`apply`] replays each one idempotently +//! with the merge rules: +//! +//! - **node bodies / titles, task scalars:** last-writer-wins by HLC. A +//! *discarded* value from a different device is recorded in `conflicts` +//! (surfaced, not silently dropped). +//! - **links:** OR-set add/remove keyed by the link's own id → no conflicts. +//! - **tombstones:** monotonic — once set, they stay. +//! +//! Idempotent: an op whose id we've already stored is a no-op. The local clock +//! absorbs each op's HLC so future local stamps stay ahead. + +use rusqlite::{Connection, OptionalExtension}; +use serde_json::Value; + +use super::{absorb_remote_hlc, new_id, nodes, ops}; +use crate::error::Result; +use crate::hlc::Hlc; +use crate::model::Conflict; +use crate::oplog::{op_type, Op}; + +/// Open conflicts for `owner`, newest first. +pub(super) fn list_conflicts(conn: &Connection, owner: &str) -> Result> { + let mut stmt = conn.prepare( + "SELECT id, node_id, field, local_val, remote_val, local_hlc, remote_hlc, status, created_at + FROM conflicts WHERE owner_id = ?1 AND status = 'open' ORDER BY created_at DESC, id", + )?; + let rows = stmt.query_map([owner], |r| { + Ok(Conflict { + id: r.get(0)?, + node_id: r.get(1)?, + field: r.get(2)?, + local_val: r.get(3)?, + remote_val: r.get(4)?, + local_hlc: r.get(5)?, + remote_hlc: r.get(6)?, + status: r.get(7)?, + created_at: r.get(8)?, + }) + })?; + Ok(rows.collect::>>()?) +} + +/// Settle a conflict. v1 records the user's choice by marking it resolved; the +/// LWW winner is already materialized (choosing the loser's value is a +/// follow-up — see [[design]]). +pub(super) fn resolve_conflict(conn: &Connection, id: &str, _choice: &str) -> Result<()> { + conn.execute( + "UPDATE conflicts SET status = 'resolved' WHERE id = ?1", + [id], + )?; + Ok(()) +} + +/// Apply a foreign op. Returns `true` if newly applied, `false` if already seen. +pub(super) fn apply(conn: &mut Connection, op: &Op) -> Result { + if ops::exists(conn, &op.id)? { + return Ok(false); + } + let tx = conn.transaction()?; + // Ensure the op's owner exists so node FKs hold even before owner adoption + // (tech-spec §13). In practice replicas share a canonical owner. + tx.execute( + "INSERT OR IGNORE INTO users (id, oidc_sub, name, created_at) VALUES (?1, NULL, 'remote', 0)", + [&op.owner_id], + )?; + let applied = match op.op_type.as_str() { + op_type::NODE_CREATE => { + node_upsert(&tx, op)?; + true + } + op_type::NODE_SET => { + node_upsert(&tx, op)?; + true + } + op_type::NODE_TOMBSTONE => { + node_tombstone(&tx, op)?; + true + } + op_type::TASK_CREATE | op_type::TASK_SET => { + task_set(&tx, op)?; + true + } + op_type::LINK_ADD => { + link_add(&tx, op)?; + true + } + op_type::LINK_REMOVE => { + link_remove(&tx, op)?; + true + } + // Unknown op types are stored but not applied (forward compatibility). + _ => false, + }; + ops::insert_op(&tx, op, applied)?; + absorb_remote_hlc(&tx, &op.hlc)?; + tx.commit()?; + Ok(true) +} + +fn str_field<'a>(payload: &'a Value, key: &str) -> Option<&'a str> { + payload.get(key).and_then(Value::as_str) +} + +fn i64_field(payload: &Value, key: &str) -> Option { + payload.get(key).and_then(Value::as_i64) +} + +/// The op's physical time (for `modified_at`/conflict timestamps). +fn op_physical(op: &Op) -> i64 { + Hlc::parse(&op.hlc).map(|h| h.physical).unwrap_or(0) +} + +/// Did a *different* device write the current value (so a divergence is real)? +fn cross_origin(op: &Op, current_hlc: &str) -> bool { + Hlc::parse(current_hlc) + .map(|h| h.origin != op.origin) + .unwrap_or(false) +} + +/// Create a node (if absent) or LWW its title/body. +fn node_upsert(tx: &Connection, op: &Op) -> Result<()> { + let p = &op.payload; + match nodes::get(tx, &op.target_id)? { + None => { + let kind = str_field(p, "kind").unwrap_or("doc"); + let title = str_field(p, "title").unwrap_or(""); + let body = str_field(p, "body"); + let created_at = i64_field(p, "created_at").unwrap_or_else(|| op_physical(op)); + tx.execute( + "INSERT INTO nodes + (id, owner_id, kind, title, body, created_at, modified_at, hlc, tombstoned) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6, ?7, 0)", + ( + &op.target_id, + &op.owner_id, + kind, + title, + body, + created_at, + &op.hlc, + ), + )?; + } + Some(existing) => { + let new_title = str_field(p, "title"); + let new_body = str_field(p, "body"); + // Conflict: a different device's body differs from ours. + if cross_origin(op, &existing.hlc) { + if let Some(nb) = new_body { + if Some(nb) != existing.body.as_deref() { + record_conflict( + tx, + op, + "body", + existing.body.as_deref(), + Some(nb), + &existing.hlc, + )?; + } + } + } + if op.hlc.as_str() > existing.hlc.as_str() { + let title = new_title.unwrap_or(&existing.title).to_string(); + let body = new_body.map(str::to_string).or(existing.body.clone()); + tx.execute( + "UPDATE nodes SET title = ?1, body = ?2, modified_at = ?3, hlc = ?4 WHERE id = ?5", + (&title, &body, op_physical(op), &op.hlc, &op.target_id), + )?; + } + } + } + Ok(()) +} + +fn node_tombstone(tx: &Connection, op: &Op) -> Result<()> { + // Monotonic: tombstone always wins. Bump hlc only if this op is newer. + if let Some(existing) = nodes::get(tx, &op.target_id)? { + let hlc = if op.hlc.as_str() > existing.hlc.as_str() { + op.hlc.clone() + } else { + existing.hlc.clone() + }; + tx.execute( + "UPDATE nodes SET tombstoned = 1, modified_at = ?1, hlc = ?2 WHERE id = ?3", + (op_physical(op), hlc, &op.target_id), + )?; + } + Ok(()) +} + +fn task_set(tx: &Connection, op: &Op) -> Result<()> { + // The backing node must already exist (ordered delivery: create precedes). + let Some(node) = nodes::get(tx, &op.target_id)? else { + return Ok(()); + }; + let p = &op.payload; + let task_exists: bool = tx + .query_row( + "SELECT 1 FROM tasks WHERE node_id = ?1", + [&op.target_id], + |_| Ok(()), + ) + .optional()? + .is_some(); + let op_wins = op.hlc.as_str() > node.hlc.as_str(); + + // Conflict: a different device's scalar snapshot differs from ours. + if task_exists && cross_origin(op, &node.hlc) { + let cur: Option<(Option, String)> = tx + .query_row( + "SELECT do_date, state FROM tasks WHERE node_id = ?1", + [&op.target_id], + |r| Ok((r.get(0)?, r.get(1)?)), + ) + .optional()?; + if let Some((cur_do, cur_state)) = cur { + let op_do = i64_field(p, "do_date"); + let op_state = str_field(p, "state").unwrap_or("outstanding"); + if cur_do != op_do { + record_conflict( + tx, + op, + "do_date", + cur_do.map(|v| v.to_string()).as_deref(), + op_do.map(|v| v.to_string()).as_deref(), + &node.hlc, + )?; + } + if cur_state != op_state { + record_conflict(tx, op, "state", Some(&cur_state), Some(op_state), &node.hlc)?; + } + } + } + + if !task_exists || op_wins { + let attention = str_field(p, "attention"); + let do_date = i64_field(p, "do_date"); + let late_on = i64_field(p, "late_on"); + let state = str_field(p, "state").unwrap_or("outstanding"); + let recurrence = str_field(p, "recurrence"); + if task_exists { + tx.execute( + "UPDATE tasks SET attention = ?1, do_date = ?2, late_on = ?3, + state = ?4, recurrence = ?5 WHERE node_id = ?6", + ( + attention, + do_date, + late_on, + state, + recurrence, + &op.target_id, + ), + )?; + } else { + tx.execute( + "INSERT INTO tasks (node_id, attention, do_date, late_on, state, recurrence) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + ( + &op.target_id, + attention, + do_date, + late_on, + state, + recurrence, + ), + )?; + } + if op_wins { + tx.execute( + "UPDATE nodes SET modified_at = ?1, hlc = ?2 WHERE id = ?3", + (op_physical(op), &op.hlc, &op.target_id), + )?; + } + } + Ok(()) +} + +fn link_add(tx: &Connection, op: &Op) -> Result<()> { + let exists: bool = tx + .query_row("SELECT 1 FROM links WHERE id = ?1", [&op.target_id], |_| { + Ok(()) + }) + .optional()? + .is_some(); + if !exists { + let p = &op.payload; + tx.execute( + "INSERT INTO links (id, src_id, dst_id, type, created_at, tombstoned) + VALUES (?1, ?2, ?3, ?4, ?5, 0)", + ( + &op.target_id, + str_field(p, "src").unwrap_or(""), + str_field(p, "dst").unwrap_or(""), + str_field(p, "type").unwrap_or("wiki"), + i64_field(p, "created_at").unwrap_or_else(|| op_physical(op)), + ), + )?; + } + Ok(()) +} + +fn link_remove(tx: &Connection, op: &Op) -> Result<()> { + tx.execute( + "UPDATE links SET tombstoned = 1 WHERE id = ?1", + [&op.target_id], + )?; + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +fn record_conflict( + tx: &Connection, + op: &Op, + field: &str, + local_val: Option<&str>, + remote_val: Option<&str>, + local_hlc: &str, +) -> Result<()> { + tx.execute( + "INSERT INTO conflicts + (id, owner_id, node_id, field, local_val, remote_val, + local_hlc, remote_hlc, status, created_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, 'open', ?9)", + ( + new_id(), + &op.owner_id, + &op.target_id, + field, + local_val, + remote_val, + local_hlc, + &op.hlc, + op_physical(op), + ), + )?; + Ok(()) +} diff --git a/crates/heph-core/src/sqlite/links.rs b/crates/heph-core/src/sqlite/links.rs index 7e5c46a..b29d3f2 100644 --- a/crates/heph-core/src/sqlite/links.rs +++ b/crates/heph-core/src/sqlite/links.rs @@ -4,10 +4,12 @@ use std::collections::HashSet; use rusqlite::{Connection, OptionalExtension, Row}; -use super::new_id; +use super::{new_id, next_hlc, ops}; use crate::error::Result; use crate::extract::extract; use crate::model::{Link, LinkType}; +use crate::oplog::op_type; +use serde_json::json; const COLUMNS: &str = "id, src_id, dst_id, type, created_at, tombstoned"; @@ -26,6 +28,7 @@ fn from_row(row: &Row) -> rusqlite::Result { /// Add a typed link. pub(super) fn add( conn: &Connection, + owner: &str, now: i64, src_id: &str, dst_id: &str, @@ -50,6 +53,20 @@ pub(super) fn add( link.created_at, ), )?; + let hlc = next_hlc(conn, now)?; + ops::record( + conn, + owner, + &hlc, + op_type::LINK_ADD, + &link.id, + json!({ + "src": link.src_id, + "dst": link.dst_id, + "type": link.link_type.as_str(), + "created_at": link.created_at, + }), + )?; Ok(link) } @@ -128,12 +145,14 @@ pub(super) fn sync_wiki_links( for (link_id, dst) in &existing { if !desired_set.contains(dst) { conn.execute("UPDATE links SET tombstoned = 1 WHERE id = ?1", [link_id])?; + let hlc = next_hlc(conn, now)?; + ops::record(conn, owner, &hlc, op_type::LINK_REMOVE, link_id, json!({}))?; } } // Add links for newly-referenced targets. for dst in &desired { if !existing_dsts.contains(dst.as_str()) { - add(conn, now, src_id, dst, LinkType::Wiki)?; + add(conn, owner, now, src_id, dst, LinkType::Wiki)?; } } Ok(()) diff --git a/crates/heph-core/src/sqlite/log.rs b/crates/heph-core/src/sqlite/log.rs index f44944b..2088225 100644 --- a/crates/heph-core/src/sqlite/log.rs +++ b/crates/heph-core/src/sqlite/log.rs @@ -9,9 +9,12 @@ use rusqlite::Connection; -use super::{links, next_hlc, nodes}; +use serde_json::json; + +use super::{links, next_hlc, nodes, ops}; use crate::error::{Error, Result}; use crate::model::{LinkType, NodeKind}; +use crate::oplog::op_type; /// The task's log doc id, creating (and linking) it on first use. pub(super) fn ensure_doc( @@ -35,7 +38,8 @@ pub(super) fn ensure_doc( Some(String::new()), ); nodes::insert(conn, &doc)?; - links::add(conn, now, task_id, &doc.id, LinkType::LogOf)?; + nodes::record_create(conn, owner, &doc)?; + links::add(conn, owner, now, task_id, &doc.id, LinkType::LogOf)?; Ok(doc.id) } @@ -53,7 +57,15 @@ pub(super) fn append( let hlc = next_hlc(conn, now)?; conn.execute( "UPDATE nodes SET body = ?1, modified_at = ?2, hlc = ?3 WHERE id = ?4", - (&new_body, now, hlc, &doc_id), + (&new_body, now, &hlc, &doc_id), + )?; + ops::record( + conn, + owner, + &hlc, + op_type::NODE_SET, + &doc_id, + json!({ "title": doc.title, "body": new_body }), )?; Ok(()) } diff --git a/crates/heph-core/src/sqlite/mod.rs b/crates/heph-core/src/sqlite/mod.rs index a72b17f..93103b4 100644 --- a/crates/heph-core/src/sqlite/mod.rs +++ b/crates/heph-core/src/sqlite/mod.rs @@ -9,11 +9,13 @@ //! as free functions over a `&Connection`; the [`Store`] impl here is a thin //! delegating layer so a transaction can span several of them. +mod apply; mod exporter; mod links; mod log; mod migrations; mod nodes; +mod ops; mod tasks; pub use migrations::latest_version; @@ -26,7 +28,10 @@ use ulid::Ulid; use crate::clock::Clock; use crate::error::{Error, Result}; use crate::hlc::Hlc; -use crate::model::{Attention, Health, Link, LinkType, NewNode, NewTask, Node, Task, TaskState}; +use crate::model::{ + Attention, Conflict, Health, Link, LinkType, NewNode, NewTask, Node, Task, TaskState, +}; +use crate::oplog::Op; use crate::ranking::RankedTask; use crate::store::Store; @@ -99,6 +104,20 @@ pub(super) fn meta_set(conn: &Connection, key: &str, value: &str) -> Result<()> Ok(()) } +/// Advance the persisted clock past a remote `hlc` (encoded), so future local +/// stamps exceed everything we've seen (tech-spec §12). Encoded HLCs sort by +/// causal order, so a string compare suffices. +pub(super) fn absorb_remote_hlc(conn: &Connection, remote: &str) -> Result<()> { + let bump = match meta_get(conn, "last_hlc")? { + Some(last) => remote > last.as_str(), + None => true, + }; + if bump { + meta_set(conn, "last_hlc", remote)?; + } + Ok(()) +} + /// Ensure this store has a stable device `origin` id. fn ensure_origin(conn: &Connection) -> Result<()> { if meta_get(conn, "origin")?.is_none() { @@ -176,7 +195,7 @@ impl Store for LocalStore { fn tombstone_node(&mut self, id: &str) -> Result<()> { let now = self.clock.now_ms(); - nodes::tombstone(&self.conn, now, id) + nodes::tombstone(&self.conn, &self.owner_id, now, id) } fn create_task(&mut self, input: NewTask) -> Result { @@ -195,12 +214,12 @@ impl Store for LocalStore { fn skip_recurrence(&mut self, node_id: &str) -> Result { let now = self.clock.now_ms(); - tasks::skip(&self.conn, now, node_id) + tasks::skip(&self.conn, &self.owner_id, now, node_id) } fn set_task_attention(&mut self, node_id: &str, attention: Attention) -> Result { let now = self.clock.now_ms(); - tasks::set_attention(&self.conn, now, node_id, attention) + tasks::set_attention(&self.conn, &self.owner_id, now, node_id, attention) } fn next(&self, scope: Option<&str>, limit: usize) -> Result> { @@ -232,7 +251,7 @@ impl Store for LocalStore { fn add_link(&mut self, src_id: &str, dst_id: &str, link_type: LinkType) -> Result { let now = self.clock.now_ms(); - links::add(&self.conn, now, src_id, dst_id, link_type) + links::add(&self.conn, &self.owner_id, now, src_id, dst_id, link_type) } fn outgoing_links(&self, id: &str) -> Result> { @@ -258,6 +277,50 @@ impl Store for LocalStore { fn export(&self, dir: &std::path::Path) -> Result { exporter::export(&self.conn, &self.owner_id, dir) } + + fn ops_since(&self, after: Option<&str>) -> Result> { + ops::since(&self.conn, &self.owner_id, after) + } + + fn apply_op(&mut self, op: &Op) -> Result { + apply::apply(&mut self.conn, op) + } + + fn adopt_owner(&mut self, canonical: &str) -> Result<()> { + if self.owner_id == canonical { + return Ok(()); + } + let old = self.owner_id.clone(); + let tx = self.conn.transaction()?; + tx.execute( + "INSERT OR IGNORE INTO users (id, oidc_sub, name, created_at) VALUES (?1, NULL, 'user', 0)", + [canonical], + )?; + tx.execute( + "UPDATE nodes SET owner_id = ?1 WHERE owner_id = ?2", + (canonical, &old), + )?; + tx.execute( + "UPDATE oplog SET owner_id = ?1 WHERE owner_id = ?2", + (canonical, &old), + )?; + tx.execute( + "UPDATE conflicts SET owner_id = ?1 WHERE owner_id = ?2", + (canonical, &old), + )?; + tx.execute("DELETE FROM users WHERE id = ?1", [&old])?; + tx.commit()?; + self.owner_id = canonical.to_string(); + Ok(()) + } + + fn conflicts_list(&self) -> Result> { + apply::list_conflicts(&self.conn, &self.owner_id) + } + + fn conflicts_resolve(&mut self, id: &str, choice: &str) -> Result<()> { + apply::resolve_conflict(&self.conn, id, choice) + } } #[cfg(test)] diff --git a/crates/heph-core/src/sqlite/nodes.rs b/crates/heph-core/src/sqlite/nodes.rs index 1eae0b3..fde9876 100644 --- a/crates/heph-core/src/sqlite/nodes.rs +++ b/crates/heph-core/src/sqlite/nodes.rs @@ -2,9 +2,22 @@ use rusqlite::{Connection, OptionalExtension, Row}; -use super::{links, new_id, next_hlc}; +use serde_json::json; + +use super::{links, new_id, next_hlc, ops}; use crate::error::{Error, Result}; use crate::model::{deterministic_id, NewNode, Node, NodeKind}; +use crate::oplog::op_type; + +/// Op payload describing a node's identity/content for `node.create`. +fn create_payload(node: &Node) -> serde_json::Value { + json!({ + "kind": node.kind.as_str(), + "title": node.title, + "body": node.body, + "created_at": node.created_at, + }) +} /// The `nodes` columns in a fixed order, shared by every SELECT here. pub(super) const COLUMNS: &str = @@ -33,6 +46,19 @@ pub(super) fn build( } } +/// Record a `node.create` op for an already-inserted node (used when a caller +/// builds + inserts a node directly, e.g. `task.create`). +pub(super) fn record_create(conn: &Connection, owner: &str, node: &Node) -> Result<()> { + ops::record( + conn, + owner, + &node.hlc, + op_type::NODE_CREATE, + &node.id, + create_payload(node), + ) +} + /// Insert a fully-formed [`Node`] row. pub(super) fn insert(conn: &Connection, node: &Node) -> Result<()> { conn.execute( @@ -69,11 +95,19 @@ pub(super) fn from_row(row: &Row) -> rusqlite::Result { }) } -/// Create and persist a node. +/// Create and persist a node, recording a `node.create` op. pub(super) fn create(conn: &Connection, owner: &str, now: i64, input: NewNode) -> Result { let hlc = next_hlc(conn, now)?; let node = build(owner, now, &hlc, input.kind, input.title, input.body); insert(conn, &node)?; + ops::record( + conn, + owner, + &node.hlc, + op_type::NODE_CREATE, + &node.id, + create_payload(&node), + )?; Ok(node) } @@ -107,6 +141,14 @@ pub(super) fn open_or_create_journal( tombstoned: false, }; insert(conn, &node)?; + ops::record( + conn, + owner, + &node.hlc, + op_type::NODE_CREATE, + &node.id, + create_payload(&node), + )?; Ok(node) } @@ -170,6 +212,14 @@ pub(super) fn update( &node.id, ), )?; + ops::record( + &tx, + owner, + &node.hlc, + op_type::NODE_SET, + &node.id, + json!({ "title": node.title, "body": node.body }), + )?; if body_changed { links::sync_wiki_links( &tx, @@ -211,25 +261,15 @@ pub(super) fn aliases(conn: &Connection, id: &str) -> Result> { /// Tombstone (soft-delete) a node. No hard deletes — tombstones keep merge /// monotonic (tech-spec §4.3). -pub(super) fn tombstone(conn: &Connection, now: i64, id: &str) -> Result<()> { +pub(super) fn tombstone(conn: &Connection, owner: &str, now: i64, id: &str) -> Result<()> { let hlc = next_hlc(conn, now)?; let updated = conn.execute( "UPDATE nodes SET tombstoned = 1, modified_at = ?1, hlc = ?2 WHERE id = ?3", - (now, hlc, id), + (now, &hlc, id), )?; if updated == 0 { return Err(Error::NodeNotFound(id.to_string())); } - Ok(()) -} - -/// Bump `modified_at`/`hlc` on a node (used when a task scalar field changes so -/// the node's modified time reflects the mutation for sync ordering). -pub(super) fn touch(conn: &Connection, now: i64, id: &str) -> Result<()> { - let hlc = next_hlc(conn, now)?; - conn.execute( - "UPDATE nodes SET modified_at = ?1, hlc = ?2 WHERE id = ?3", - (now, hlc, id), - )?; + ops::record(conn, owner, &hlc, op_type::NODE_TOMBSTONE, id, json!({}))?; Ok(()) } diff --git a/crates/heph-core/src/sqlite/ops.rs b/crates/heph-core/src/sqlite/ops.rs new file mode 100644 index 0000000..a4b7f76 --- /dev/null +++ b/crates/heph-core/src/sqlite/ops.rs @@ -0,0 +1,97 @@ +//! `oplog` table operations — recording local ops and reading them by HLC +//! cursor (tech-spec §12). The merge/apply path lives in [`super::apply`]. + +use rusqlite::{Connection, Row}; +use serde_json::Value; + +use super::new_id; +use crate::error::Result; +use crate::hlc::Hlc; +use crate::oplog::Op; + +const COLUMNS: &str = "id, owner_id, hlc, origin, op_type, target_id, payload, applied"; + +/// Record a **local** op (already applied to the materialized tables in this +/// same transaction). The originating device is recovered from `hlc`. +pub(super) fn record( + conn: &Connection, + owner: &str, + hlc: &str, + op_type: &str, + target_id: &str, + payload: Value, +) -> Result<()> { + let origin = Hlc::parse(hlc)?.origin; + conn.execute( + "INSERT INTO oplog (id, owner_id, hlc, origin, op_type, target_id, payload, applied) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1)", + ( + new_id(), + owner, + hlc, + origin, + op_type, + target_id, + payload.to_string(), + ), + )?; + Ok(()) +} + +fn from_row(row: &Row) -> rusqlite::Result { + let payload_text: String = row.get("payload")?; + let payload: Value = serde_json::from_str(&payload_text).map_err(|e| { + rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e)) + })?; + Ok(Op { + id: row.get("id")?, + owner_id: row.get("owner_id")?, + hlc: row.get("hlc")?, + origin: row.get("origin")?, + op_type: row.get("op_type")?, + target_id: row.get("target_id")?, + payload, + applied: row.get::<_, i64>("applied")? != 0, + }) +} + +/// Ops for `owner` with HLC strictly greater than `after` (None ⇒ all), in +/// causal (HLC) order — the push cursor for sync. +pub(super) fn since(conn: &Connection, owner: &str, after: Option<&str>) -> Result> { + let sql = format!( + "SELECT {COLUMNS} FROM oplog + WHERE owner_id = ?1 AND (?2 IS NULL OR hlc > ?2) + ORDER BY hlc" + ); + let mut stmt = conn.prepare(&sql)?; + let rows = stmt.query_map((owner, after), from_row)?; + Ok(rows.collect::>>()?) +} + +/// Store a foreign [`Op`] verbatim (its own id/hlc/origin), marking whether it +/// was applied to the materialized tables. +pub(super) fn insert_op(conn: &Connection, op: &Op, applied: bool) -> Result<()> { + conn.execute( + "INSERT INTO oplog (id, owner_id, hlc, origin, op_type, target_id, payload, applied) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", + ( + &op.id, + &op.owner_id, + &op.hlc, + &op.origin, + &op.op_type, + &op.target_id, + op.payload.to_string(), + applied as i64, + ), + )?; + Ok(()) +} + +/// Whether an op with this id has already been recorded (idempotent apply). +pub(super) fn exists(conn: &Connection, op_id: &str) -> Result { + let n: i64 = conn.query_row("SELECT COUNT(*) FROM oplog WHERE id = ?1", [op_id], |r| { + r.get(0) + })?; + Ok(n > 0) +} diff --git a/crates/heph-core/src/sqlite/tasks.rs b/crates/heph-core/src/sqlite/tasks.rs index 343d675..771d47a 100644 --- a/crates/heph-core/src/sqlite/tasks.rs +++ b/crates/heph-core/src/sqlite/tasks.rs @@ -5,12 +5,46 @@ use rusqlite::{Connection, OptionalExtension, Row}; -use super::{links, log, next_hlc, nodes}; +use serde_json::json; + +use super::{links, log, next_hlc, nodes, ops}; use crate::error::{Error, Result}; use crate::model::{Attention, Health, LinkType, NewTask, NodeKind, Task, TaskState}; +use crate::oplog::op_type; use crate::ranking::{self, RankedTask}; use crate::recurrence; +/// JSON payload of a task's scalar fields (for `task.create`/`task.set` ops). +fn scalar_payload(t: &Task) -> serde_json::Value { + json!({ + "attention": t.attention.map(|a| a.as_str()), + "do_date": t.do_date, + "late_on": t.late_on, + "state": t.state.as_str(), + "recurrence": t.recurrence, + }) +} + +/// Bump the task node's hlc/modified_at and record a `task.set` op snapshotting +/// the task's current scalars (LWW unit, tech-spec §12). +fn record_set(conn: &Connection, owner: &str, now: i64, node_id: &str) -> Result<()> { + let task = require(conn, node_id)?; + let hlc = next_hlc(conn, now)?; + conn.execute( + "UPDATE nodes SET modified_at = ?1, hlc = ?2 WHERE id = ?3", + (now, &hlc, node_id), + )?; + ops::record( + conn, + owner, + &hlc, + op_type::TASK_SET, + node_id, + scalar_payload(&task), + )?; + Ok(()) +} + fn from_row(row: &Row) -> rusqlite::Result { let attention = match row.get::<_, Option>("attention")? { Some(s) => Some( @@ -57,6 +91,7 @@ pub(super) fn create(conn: &mut Connection, owner: &str, now: i64, input: NewTas None, ); nodes::insert(&tx, &task_node)?; + nodes::record_create(&tx, owner, &task_node)?; tx.execute( "INSERT INTO tasks (node_id, attention, do_date, late_on, state, recurrence) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", @@ -69,6 +104,19 @@ pub(super) fn create(conn: &mut Connection, owner: &str, now: i64, input: NewTas &task.recurrence, ), )?; + let task = Task { + node_id: task_node.id.clone(), + ..task + }; + let task_create_hlc = next_hlc(&tx, now)?; + ops::record( + &tx, + owner, + &task_create_hlc, + op_type::TASK_CREATE, + &task.node_id, + scalar_payload(&task), + )?; // The canonical context doc (the task's jumping-off point / checklist body). let doc_hlc = next_hlc(&tx, now)?; @@ -81,18 +129,30 @@ pub(super) fn create(conn: &mut Connection, owner: &str, now: i64, input: NewTas Some(String::new()), ); nodes::insert(&tx, &doc)?; - links::add(&tx, now, &task_node.id, &doc.id, LinkType::CanonicalContext)?; + nodes::record_create(&tx, owner, &doc)?; + links::add( + &tx, + owner, + now, + &task.node_id, + &doc.id, + LinkType::CanonicalContext, + )?; if let Some(project_id) = &input.project_id { - links::add(&tx, now, &task_node.id, project_id, LinkType::InProject)?; + links::add( + &tx, + owner, + now, + &task.node_id, + project_id, + LinkType::InProject, + )?; } tx.commit()?; - Ok(Task { - node_id: task_node.id, - ..task - }) + Ok(task) } /// Fetch a task by node id. @@ -128,7 +188,7 @@ pub(super) fn set_state( "UPDATE tasks SET state = ?1 WHERE node_id = ?2", (state.as_str(), node_id), )?; - nodes::touch(conn, now, node_id)?; + record_set(conn, owner, now, node_id)?; require(conn, node_id) } @@ -168,7 +228,7 @@ fn roll_forward(conn: &mut Connection, owner: &str, now: i64, task: &Task) -> Re // 3. Advance the do-date (or finally finish a finite series). advance(&tx, now, &task.node_id, rrule, task.do_date)?; - nodes::touch(&tx, now, &task.node_id)?; + record_set(&tx, owner, now, &task.node_id)?; tx.commit()?; require(conn, &task.node_id) @@ -203,14 +263,14 @@ fn advance( /// Skip the current occurrence of a recurring task: advance the do-date the same /// way as completion but **without** logging a completion (tech-spec §4.4). -pub(super) fn skip(conn: &Connection, now: i64, node_id: &str) -> Result { +pub(super) fn skip(conn: &Connection, owner: &str, now: i64, node_id: &str) -> Result { let task = require(conn, node_id)?; let rrule = task .recurrence .as_deref() .ok_or_else(|| Error::Integrity(format!("skip on non-recurring task {node_id}")))?; advance(conn, now, node_id, rrule, task.do_date)?; - nodes::touch(conn, now, node_id)?; + record_set(conn, owner, now, node_id)?; require(conn, node_id) } @@ -348,6 +408,7 @@ fn load_candidates(conn: &Connection, owner: &str) -> Result> { /// Set a task's attention-state. pub(super) fn set_attention( conn: &Connection, + owner: &str, now: i64, node_id: &str, attention: Attention, @@ -359,6 +420,6 @@ pub(super) fn set_attention( if updated == 0 { return Err(Error::NodeNotFound(node_id.to_string())); } - nodes::touch(conn, now, node_id)?; + record_set(conn, owner, now, node_id)?; require(conn, node_id) } diff --git a/crates/heph-core/src/store.rs b/crates/heph-core/src/store.rs index 2893565..c4aaea6 100644 --- a/crates/heph-core/src/store.rs +++ b/crates/heph-core/src/store.rs @@ -5,7 +5,10 @@ //! `RemoteStore`) is configuration. This trait is the seam. use crate::error::Result; -use crate::model::{Attention, Health, Link, LinkType, NewNode, NewTask, Node, Task, TaskState}; +use crate::model::{ + Attention, Conflict, Health, Link, LinkType, NewNode, NewTask, Node, Task, TaskState, +}; +use crate::oplog::Op; use crate::ranking::RankedTask; /// A backend that can store and retrieve nodes, tasks, and links. @@ -106,4 +109,27 @@ pub trait Store { /// Export every non-tombstoned node to a `.md` directory tree under `dir`, /// returning the count written (tech-spec §5). One-way; no import. fn export(&self, dir: &std::path::Path) -> Result; + + // --- sync (op-log) --- + + /// Ops for this owner with HLC strictly greater than `after` (None ⇒ all), + /// in causal order — the push cursor for sync (tech-spec §12). + fn ops_since(&self, after: Option<&str>) -> Result>; + + /// Apply a foreign op with the merge rules (LWW / OR-set / tombstone), + /// idempotently. Returns `true` if newly applied. Ops should be applied in + /// HLC order (tech-spec §12). + fn apply_op(&mut self, op: &Op) -> Result; + + /// Rewrite this replica's `owner_id` to a canonical user id — the one-time, + /// pre-first-sync adoption of tech-spec §13. After this all data is owned by + /// `canonical`; replicas that adopt the same id can sync. (Full adoption, + /// incl. owner-embedded deterministic ids, is refined with auth.) + fn adopt_owner(&mut self, canonical: &str) -> Result<()>; + + /// Open merge conflicts surfaced for the user (`heph conflicts`). + fn conflicts_list(&self) -> Result>; + + /// Settle a conflict by the user's choice (`"local"`/`"remote"`). + fn conflicts_resolve(&mut self, id: &str, choice: &str) -> Result<()>; } diff --git a/crates/heph-core/tests/convergence.rs b/crates/heph-core/tests/convergence.rs new file mode 100644 index 0000000..a445771 --- /dev/null +++ b/crates/heph-core/tests/convergence.rs @@ -0,0 +1,207 @@ +//! Op-log + merge convergence (tech-spec §12, slice 8b/8c). Two local replicas +//! (distinct origins) exchange ops; we assert they converge and that ambiguous +//! merges surface as conflicts. Body merge here is LWW; the yrs text CRDT lands +//! in a follow-up. No network yet — we hand ops across directly. + +use heph_core::{Attention, Clock, LocalStore, NewNode, NewTask, Op, Store, TaskState}; +use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::Arc; + +#[derive(Clone)] +struct StepClock(Arc); +impl StepClock { + fn new(ms: i64) -> Self { + StepClock(Arc::new(AtomicI64::new(ms))) + } + fn set(&self, ms: i64) { + self.0.store(ms, Ordering::SeqCst); + } +} +impl Clock for StepClock { + fn now_ms(&self) -> i64 { + self.0.load(Ordering::SeqCst) + } +} + +/// The shared canonical user both replicas adopt — the §13 model where every +/// device of one human carries the same owner id. +const OWNER: &str = "canonical-user"; + +fn replica(now: i64) -> (LocalStore, StepClock) { + let c = StepClock::new(now); + let mut s = LocalStore::open_in_memory(Box::new(c.clone())).unwrap(); + s.adopt_owner(OWNER).unwrap(); + (s, c) +} + +/// Push every op from `src` (after `cursor`) into `dst`, in HLC order. +fn sync_one_way(src: &dyn Store, dst: &mut dyn Store, cursor: Option<&str>) -> Option { + let ops: Vec = src.ops_since(cursor).unwrap(); + let mut last = cursor.map(str::to_string); + for op in &ops { + dst.apply_op(op).unwrap(); + last = Some(op.hlc.clone()); + } + last +} + +#[test] +fn online_round_trip_propagates_a_node() { + let (mut a, _ca) = replica(1000); + let (mut b, _cb) = replica(1000); + + let n = a + .create_node(NewNode::doc("Roof", "shingles need work")) + .unwrap(); + sync_one_way(&a, &mut b, None); + + let on_b = b.get_node(&n.id).unwrap().expect("node reached B"); + assert_eq!(on_b.title, "Roof"); + assert_eq!(on_b.body.as_deref(), Some("shingles need work")); +} + +#[test] +fn apply_is_idempotent() { + let (mut a, _ca) = replica(1000); + let (mut b, _cb) = replica(1000); + let n = a.create_node(NewNode::doc("X", "y")).unwrap(); + + // Apply all of A's ops to B twice — second pass is a no-op. + sync_one_way(&a, &mut b, None); + let again: Vec = a.ops_since(None).unwrap(); + for op in &again { + assert!( + !b.apply_op(op).unwrap(), + "re-applying {} mutated B", + op.op_type + ); + } + assert_eq!(b.get_node(&n.id).unwrap().unwrap().title, "X"); +} + +#[test] +fn offline_divergent_scalar_edits_converge_with_a_conflict() { + // A creates a task; B learns it. Then both go offline and set a different + // do_date. After exchanging, both converge to the higher-HLC value and each + // records a conflict for the discarded value. + let (mut a, ca) = replica(1000); + let (mut b, cb) = replica(1000); + + let task = a + .create_task(NewTask { + title: "Renew passport".into(), + attention: Some(Attention::Orange), + ..Default::default() + }) + .unwrap(); + sync_one_way(&a, &mut b, None); + + // Divergent offline edits. B's edit is later (higher physical time) → wins. + ca.set(2000); + a.set_task_state(&task.node_id, TaskState::Done).unwrap(); + cb.set(3000); + b.set_task_attention(&task.node_id, Attention::Red).unwrap(); + // Give each a distinct do_date too (the conflicting field). + // (set_task_* above already produced task.set ops snapshotting scalars.) + + // Exchange the divergent ops. + sync_one_way(&a, &mut b, None); + sync_one_way(&b, &mut a, None); + + // Both replicas converge to identical task scalars. + let ta = a.get_task(&task.node_id).unwrap().unwrap(); + let tb = b.get_task(&task.node_id).unwrap().unwrap(); + assert_eq!(ta, tb, "replicas did not converge: {ta:?} vs {tb:?}"); + // B wrote last (t=3000) → its attention=red wins on both. + assert_eq!(ta.attention, Some(Attention::Red)); + + // Each replica surfaced the divergence as a conflict (not silently merged). + assert!( + !a.conflicts_list().unwrap().is_empty(), + "A recorded no conflict" + ); + assert!( + !b.conflicts_list().unwrap().is_empty(), + "B recorded no conflict" + ); +} + +#[test] +fn concurrent_body_edits_converge_lww() { + let (mut a, ca) = replica(1000); + let (mut b, cb) = replica(1000); + + let n = a.create_node(NewNode::doc("Note", "base")).unwrap(); + sync_one_way(&a, &mut b, None); + + ca.set(2000); + a.update_node(&n.id, None, Some("A's edit".into())).unwrap(); + cb.set(2500); + b.update_node(&n.id, None, Some("B's edit".into())).unwrap(); + + sync_one_way(&a, &mut b, None); + sync_one_way(&b, &mut a, None); + + let ba = a.get_node(&n.id).unwrap().unwrap().body; + let bb = b.get_node(&n.id).unwrap().unwrap().body; + assert_eq!(ba, bb, "bodies did not converge"); + assert_eq!(ba.as_deref(), Some("B's edit")); // later HLC wins +} + +#[test] +fn links_are_an_or_set() { + // A and B each add a different link to the same node; both survive. + let (mut a, ca) = replica(1000); + let (mut b, cb) = replica(1000); + + let src = a.create_node(NewNode::doc("src", "")).unwrap(); + let d1 = a.create_node(NewNode::doc("d1", "")).unwrap(); + sync_one_way(&a, &mut b, None); + let d2 = b.create_node(NewNode::doc("d2", "")).unwrap(); + sync_one_way(&b, &mut a, None); + + ca.set(2000); + a.add_link(&src.id, &d1.id, heph_core::LinkType::Blocks) + .unwrap(); + cb.set(2000); + b.add_link(&src.id, &d2.id, heph_core::LinkType::Blocks) + .unwrap(); + + sync_one_way(&a, &mut b, None); + sync_one_way(&b, &mut a, None); + + let dsts_a: Vec = a + .outgoing_links(&src.id) + .unwrap() + .into_iter() + .filter(|l| l.link_type == heph_core::LinkType::Blocks) + .map(|l| l.dst_id) + .collect(); + let mut dsts_b: Vec = b + .outgoing_links(&src.id) + .unwrap() + .into_iter() + .filter(|l| l.link_type == heph_core::LinkType::Blocks) + .map(|l| l.dst_id) + .collect(); + let mut sorted_a = dsts_a.clone(); + sorted_a.sort(); + dsts_b.sort(); + assert_eq!(sorted_a, dsts_b, "link sets did not converge"); + assert!(dsts_b.contains(&d1.id) && dsts_b.contains(&d2.id)); +} + +#[test] +fn tombstones_propagate_and_are_monotonic() { + let (mut a, _ca) = replica(1000); + let (mut b, _cb) = replica(1000); + let n = a.create_node(NewNode::doc("doomed", "")).unwrap(); + sync_one_way(&a, &mut b, None); + + a.tombstone_node(&n.id).unwrap(); + sync_one_way(&a, &mut b, None); + + assert!(b.get_node(&n.id).unwrap().unwrap().tombstoned); + // Tombstoned nodes drop out of search/next on B. + assert!(b.search("doomed").unwrap().is_empty()); +}