heph-core: op-log recording + merge/apply engine (sync 8b/8c)
Some checks failed
Build / validate (pull_request) Failing after 3s

The conceptual core of sync: every mutation records an Op, and foreign
ops are applied with merge rules to converge replicas (tech-spec §12).

Recording (8b): each node/task/link mutation appends an oplog Op stamped
with its HLC — node.create/set/tombstone, task.create/set, link.add/
remove. `Store::ops_since(cursor)` is the push cursor.

Merge/apply (8c): `Store::apply_op` replays a foreign op idempotently —
  - bodies/titles + task scalars: last-writer-wins by HLC; a discarded
    cross-device value is recorded in `conflicts` (surfaced, not dropped);
  - links: OR-set add/remove by link id;
  - tombstones: monotonic.
The local clock absorbs each applied HLC. `conflicts_list`/
`conflicts_resolve` expose the queue. `adopt_owner` rewrites a replica to
a canonical user id (basic §13 adoption) so replicas can share data.

13 tests: HLC stamping (4) + 6-case two-replica convergence (round-trip,
idempotency, scalar LWW + conflict, body LWW, link OR-set, monotonic
tombstone). 102 tests green. Body merge is LWW for now; the yrs text
CRDT (8d) upgrades concurrent body edits to auto-merge.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Erich Blume 2026-05-31 21:29:20 -07:00
commit b05ddf4bb5
14 changed files with 983 additions and 40 deletions

1
Cargo.lock generated
View file

@ -363,6 +363,7 @@ dependencies = [
"rrule",
"rusqlite",
"serde",
"serde_json",
"tempfile",
"thiserror 2.0.18",
"ulid",

View file

@ -16,6 +16,7 @@ pulldown-cmark.workspace = true
rrule.workspace = true
chrono.workspace = true
serde.workspace = true
serde_json.workspace = true
[dev-dependencies]
proptest = "1"

View file

@ -14,6 +14,7 @@ pub mod export;
pub mod extract;
pub mod hlc;
pub mod model;
pub mod oplog;
pub mod ranking;
pub mod recurrence;
pub mod sqlite;
@ -25,9 +26,10 @@ pub use export::{render as render_export, ExportFile, NodeExport};
pub use extract::{extract, ContextItem, Extraction};
pub use hlc::{Hlc, HlcClock};
pub use model::{
deterministic_id, Attention, Health, Link, LinkType, NewNode, NewTask, Node, NodeKind, Task,
TaskState,
deterministic_id, Attention, Conflict, Health, Link, LinkType, NewNode, NewTask, Node,
NodeKind, Task, TaskState,
};
pub use oplog::Op;
pub use ranking::{rank, Dimension, RankedTask, RANKING};
pub use recurrence::{next_occurrence, reset_checkboxes};
pub use sqlite::LocalStore;

View file

@ -264,6 +264,31 @@ pub struct Health {
pub sync_status: String,
}
/// An ambiguous merge surfaced to the user (a discarded LWW value, tech-spec
/// §12). The winning value is already in the store; this records what was
/// dropped so `heph conflicts` can show and settle it.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Conflict {
/// Conflict id.
pub id: String,
/// The node the conflict is about.
pub node_id: String,
/// Which field / region (`body`, `do_date`, `state`, …).
pub field: String,
/// The local value at merge time.
pub local_val: Option<String>,
/// The incoming remote value.
pub remote_val: Option<String>,
/// HLC of the local value.
pub local_hlc: String,
/// HLC of the remote value.
pub remote_hlc: String,
/// `open` or `resolved`.
pub status: String,
/// When recorded, epoch ms.
pub created_at: i64,
}
/// Deterministic id for key-unique kinds (`journal`/`tag`) so two offline
/// replicas that independently create the same logical singleton converge
/// (tech-spec §3.1, [[design]] §3.1). Content nodes use random ULIDs instead.

View file

@ -0,0 +1,49 @@
//! The operation log — the unit of sync (tech-spec §12).
//!
//! Every local mutation appends an append-only [`Op`] describing the change,
//! stamped with the writer's [`Hlc`](crate::hlc::Hlc). Sync exchanges ops by
//! HLC cursor; a peer applies foreign ops with the merge rules (LWW for
//! scalars, OR-set for links, CRDT for bodies). Local ops are recorded already
//! `applied` (the materialized tables were written in the same transaction).
use serde::{Deserialize, Serialize};
use serde_json::Value;
/// Op type discriminators (the `op_type` column).
pub mod op_type {
/// A node was created. Payload: `{kind, title, body, created_at}`.
pub const NODE_CREATE: &str = "node.create";
/// A node's title/body was set (LWW). Payload: `{title, body}`.
pub const NODE_SET: &str = "node.set";
/// A node was tombstoned. Payload: `{}`.
pub const NODE_TOMBSTONE: &str = "node.tombstone";
/// A task row was created. Payload: the task scalars.
pub const TASK_CREATE: &str = "task.create";
/// One or more task scalars were set (LWW). Payload: the changed scalars.
pub const TASK_SET: &str = "task.set";
/// A link was added (OR-set add). Payload: `{src, dst, type}`.
pub const LINK_ADD: &str = "link.add";
/// A link was removed/tombstoned (OR-set remove). Payload: `{}`.
pub const LINK_REMOVE: &str = "link.remove";
}
/// One operation-log entry (a row of `oplog`).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Op {
/// ULID id of the op.
pub id: String,
/// Owning user.
pub owner_id: String,
/// HLC stamp (encoded) — the causal-order key.
pub hlc: String,
/// Originating device id.
pub origin: String,
/// Op type (see [`op_type`]).
pub op_type: String,
/// The node/link this op targets.
pub target_id: String,
/// Type-specific JSON payload.
pub payload: Value,
/// Whether this op has been applied to the materialized tables.
pub applied: bool,
}

View file

@ -0,0 +1,340 @@
//! Applying foreign ops — the merge engine (tech-spec §12).
//!
//! A peer's ops arrive in HLC order; [`apply`] replays each one idempotently
//! with the merge rules:
//!
//! - **node bodies / titles, task scalars:** last-writer-wins by HLC. A
//! *discarded* value from a different device is recorded in `conflicts`
//! (surfaced, not silently dropped).
//! - **links:** OR-set add/remove keyed by the link's own id → no conflicts.
//! - **tombstones:** monotonic — once set, they stay.
//!
//! Idempotent: an op whose id we've already stored is a no-op. The local clock
//! absorbs each op's HLC so future local stamps stay ahead.
use rusqlite::{Connection, OptionalExtension};
use serde_json::Value;
use super::{absorb_remote_hlc, new_id, nodes, ops};
use crate::error::Result;
use crate::hlc::Hlc;
use crate::model::Conflict;
use crate::oplog::{op_type, Op};
/// Open conflicts for `owner`, newest first.
pub(super) fn list_conflicts(conn: &Connection, owner: &str) -> Result<Vec<Conflict>> {
let mut stmt = conn.prepare(
"SELECT id, node_id, field, local_val, remote_val, local_hlc, remote_hlc, status, created_at
FROM conflicts WHERE owner_id = ?1 AND status = 'open' ORDER BY created_at DESC, id",
)?;
let rows = stmt.query_map([owner], |r| {
Ok(Conflict {
id: r.get(0)?,
node_id: r.get(1)?,
field: r.get(2)?,
local_val: r.get(3)?,
remote_val: r.get(4)?,
local_hlc: r.get(5)?,
remote_hlc: r.get(6)?,
status: r.get(7)?,
created_at: r.get(8)?,
})
})?;
Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
}
/// Settle a conflict. v1 records the user's choice by marking it resolved; the
/// LWW winner is already materialized (choosing the loser's value is a
/// follow-up — see [[design]]).
pub(super) fn resolve_conflict(conn: &Connection, id: &str, _choice: &str) -> Result<()> {
conn.execute(
"UPDATE conflicts SET status = 'resolved' WHERE id = ?1",
[id],
)?;
Ok(())
}
/// Apply a foreign op. Returns `true` if newly applied, `false` if already seen.
pub(super) fn apply(conn: &mut Connection, op: &Op) -> Result<bool> {
if ops::exists(conn, &op.id)? {
return Ok(false);
}
let tx = conn.transaction()?;
// Ensure the op's owner exists so node FKs hold even before owner adoption
// (tech-spec §13). In practice replicas share a canonical owner.
tx.execute(
"INSERT OR IGNORE INTO users (id, oidc_sub, name, created_at) VALUES (?1, NULL, 'remote', 0)",
[&op.owner_id],
)?;
let applied = match op.op_type.as_str() {
op_type::NODE_CREATE => {
node_upsert(&tx, op)?;
true
}
op_type::NODE_SET => {
node_upsert(&tx, op)?;
true
}
op_type::NODE_TOMBSTONE => {
node_tombstone(&tx, op)?;
true
}
op_type::TASK_CREATE | op_type::TASK_SET => {
task_set(&tx, op)?;
true
}
op_type::LINK_ADD => {
link_add(&tx, op)?;
true
}
op_type::LINK_REMOVE => {
link_remove(&tx, op)?;
true
}
// Unknown op types are stored but not applied (forward compatibility).
_ => false,
};
ops::insert_op(&tx, op, applied)?;
absorb_remote_hlc(&tx, &op.hlc)?;
tx.commit()?;
Ok(true)
}
fn str_field<'a>(payload: &'a Value, key: &str) -> Option<&'a str> {
payload.get(key).and_then(Value::as_str)
}
fn i64_field(payload: &Value, key: &str) -> Option<i64> {
payload.get(key).and_then(Value::as_i64)
}
/// The op's physical time (for `modified_at`/conflict timestamps).
fn op_physical(op: &Op) -> i64 {
Hlc::parse(&op.hlc).map(|h| h.physical).unwrap_or(0)
}
/// Did a *different* device write the current value (so a divergence is real)?
fn cross_origin(op: &Op, current_hlc: &str) -> bool {
Hlc::parse(current_hlc)
.map(|h| h.origin != op.origin)
.unwrap_or(false)
}
/// Create a node (if absent) or LWW its title/body.
fn node_upsert(tx: &Connection, op: &Op) -> Result<()> {
let p = &op.payload;
match nodes::get(tx, &op.target_id)? {
None => {
let kind = str_field(p, "kind").unwrap_or("doc");
let title = str_field(p, "title").unwrap_or("");
let body = str_field(p, "body");
let created_at = i64_field(p, "created_at").unwrap_or_else(|| op_physical(op));
tx.execute(
"INSERT INTO nodes
(id, owner_id, kind, title, body, created_at, modified_at, hlc, tombstoned)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6, ?7, 0)",
(
&op.target_id,
&op.owner_id,
kind,
title,
body,
created_at,
&op.hlc,
),
)?;
}
Some(existing) => {
let new_title = str_field(p, "title");
let new_body = str_field(p, "body");
// Conflict: a different device's body differs from ours.
if cross_origin(op, &existing.hlc) {
if let Some(nb) = new_body {
if Some(nb) != existing.body.as_deref() {
record_conflict(
tx,
op,
"body",
existing.body.as_deref(),
Some(nb),
&existing.hlc,
)?;
}
}
}
if op.hlc.as_str() > existing.hlc.as_str() {
let title = new_title.unwrap_or(&existing.title).to_string();
let body = new_body.map(str::to_string).or(existing.body.clone());
tx.execute(
"UPDATE nodes SET title = ?1, body = ?2, modified_at = ?3, hlc = ?4 WHERE id = ?5",
(&title, &body, op_physical(op), &op.hlc, &op.target_id),
)?;
}
}
}
Ok(())
}
fn node_tombstone(tx: &Connection, op: &Op) -> Result<()> {
// Monotonic: tombstone always wins. Bump hlc only if this op is newer.
if let Some(existing) = nodes::get(tx, &op.target_id)? {
let hlc = if op.hlc.as_str() > existing.hlc.as_str() {
op.hlc.clone()
} else {
existing.hlc.clone()
};
tx.execute(
"UPDATE nodes SET tombstoned = 1, modified_at = ?1, hlc = ?2 WHERE id = ?3",
(op_physical(op), hlc, &op.target_id),
)?;
}
Ok(())
}
fn task_set(tx: &Connection, op: &Op) -> Result<()> {
// The backing node must already exist (ordered delivery: create precedes).
let Some(node) = nodes::get(tx, &op.target_id)? else {
return Ok(());
};
let p = &op.payload;
let task_exists: bool = tx
.query_row(
"SELECT 1 FROM tasks WHERE node_id = ?1",
[&op.target_id],
|_| Ok(()),
)
.optional()?
.is_some();
let op_wins = op.hlc.as_str() > node.hlc.as_str();
// Conflict: a different device's scalar snapshot differs from ours.
if task_exists && cross_origin(op, &node.hlc) {
let cur: Option<(Option<i64>, String)> = tx
.query_row(
"SELECT do_date, state FROM tasks WHERE node_id = ?1",
[&op.target_id],
|r| Ok((r.get(0)?, r.get(1)?)),
)
.optional()?;
if let Some((cur_do, cur_state)) = cur {
let op_do = i64_field(p, "do_date");
let op_state = str_field(p, "state").unwrap_or("outstanding");
if cur_do != op_do {
record_conflict(
tx,
op,
"do_date",
cur_do.map(|v| v.to_string()).as_deref(),
op_do.map(|v| v.to_string()).as_deref(),
&node.hlc,
)?;
}
if cur_state != op_state {
record_conflict(tx, op, "state", Some(&cur_state), Some(op_state), &node.hlc)?;
}
}
}
if !task_exists || op_wins {
let attention = str_field(p, "attention");
let do_date = i64_field(p, "do_date");
let late_on = i64_field(p, "late_on");
let state = str_field(p, "state").unwrap_or("outstanding");
let recurrence = str_field(p, "recurrence");
if task_exists {
tx.execute(
"UPDATE tasks SET attention = ?1, do_date = ?2, late_on = ?3,
state = ?4, recurrence = ?5 WHERE node_id = ?6",
(
attention,
do_date,
late_on,
state,
recurrence,
&op.target_id,
),
)?;
} else {
tx.execute(
"INSERT INTO tasks (node_id, attention, do_date, late_on, state, recurrence)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
(
&op.target_id,
attention,
do_date,
late_on,
state,
recurrence,
),
)?;
}
if op_wins {
tx.execute(
"UPDATE nodes SET modified_at = ?1, hlc = ?2 WHERE id = ?3",
(op_physical(op), &op.hlc, &op.target_id),
)?;
}
}
Ok(())
}
fn link_add(tx: &Connection, op: &Op) -> Result<()> {
let exists: bool = tx
.query_row("SELECT 1 FROM links WHERE id = ?1", [&op.target_id], |_| {
Ok(())
})
.optional()?
.is_some();
if !exists {
let p = &op.payload;
tx.execute(
"INSERT INTO links (id, src_id, dst_id, type, created_at, tombstoned)
VALUES (?1, ?2, ?3, ?4, ?5, 0)",
(
&op.target_id,
str_field(p, "src").unwrap_or(""),
str_field(p, "dst").unwrap_or(""),
str_field(p, "type").unwrap_or("wiki"),
i64_field(p, "created_at").unwrap_or_else(|| op_physical(op)),
),
)?;
}
Ok(())
}
fn link_remove(tx: &Connection, op: &Op) -> Result<()> {
tx.execute(
"UPDATE links SET tombstoned = 1 WHERE id = ?1",
[&op.target_id],
)?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn record_conflict(
tx: &Connection,
op: &Op,
field: &str,
local_val: Option<&str>,
remote_val: Option<&str>,
local_hlc: &str,
) -> Result<()> {
tx.execute(
"INSERT INTO conflicts
(id, owner_id, node_id, field, local_val, remote_val,
local_hlc, remote_hlc, status, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, 'open', ?9)",
(
new_id(),
&op.owner_id,
&op.target_id,
field,
local_val,
remote_val,
local_hlc,
&op.hlc,
op_physical(op),
),
)?;
Ok(())
}

View file

@ -4,10 +4,12 @@ use std::collections::HashSet;
use rusqlite::{Connection, OptionalExtension, Row};
use super::new_id;
use super::{new_id, next_hlc, ops};
use crate::error::Result;
use crate::extract::extract;
use crate::model::{Link, LinkType};
use crate::oplog::op_type;
use serde_json::json;
const COLUMNS: &str = "id, src_id, dst_id, type, created_at, tombstoned";
@ -26,6 +28,7 @@ fn from_row(row: &Row) -> rusqlite::Result<Link> {
/// Add a typed link.
pub(super) fn add(
conn: &Connection,
owner: &str,
now: i64,
src_id: &str,
dst_id: &str,
@ -50,6 +53,20 @@ pub(super) fn add(
link.created_at,
),
)?;
let hlc = next_hlc(conn, now)?;
ops::record(
conn,
owner,
&hlc,
op_type::LINK_ADD,
&link.id,
json!({
"src": link.src_id,
"dst": link.dst_id,
"type": link.link_type.as_str(),
"created_at": link.created_at,
}),
)?;
Ok(link)
}
@ -128,12 +145,14 @@ pub(super) fn sync_wiki_links(
for (link_id, dst) in &existing {
if !desired_set.contains(dst) {
conn.execute("UPDATE links SET tombstoned = 1 WHERE id = ?1", [link_id])?;
let hlc = next_hlc(conn, now)?;
ops::record(conn, owner, &hlc, op_type::LINK_REMOVE, link_id, json!({}))?;
}
}
// Add links for newly-referenced targets.
for dst in &desired {
if !existing_dsts.contains(dst.as_str()) {
add(conn, now, src_id, dst, LinkType::Wiki)?;
add(conn, owner, now, src_id, dst, LinkType::Wiki)?;
}
}
Ok(())

View file

@ -9,9 +9,12 @@
use rusqlite::Connection;
use super::{links, next_hlc, nodes};
use serde_json::json;
use super::{links, next_hlc, nodes, ops};
use crate::error::{Error, Result};
use crate::model::{LinkType, NodeKind};
use crate::oplog::op_type;
/// The task's log doc id, creating (and linking) it on first use.
pub(super) fn ensure_doc(
@ -35,7 +38,8 @@ pub(super) fn ensure_doc(
Some(String::new()),
);
nodes::insert(conn, &doc)?;
links::add(conn, now, task_id, &doc.id, LinkType::LogOf)?;
nodes::record_create(conn, owner, &doc)?;
links::add(conn, owner, now, task_id, &doc.id, LinkType::LogOf)?;
Ok(doc.id)
}
@ -53,7 +57,15 @@ pub(super) fn append(
let hlc = next_hlc(conn, now)?;
conn.execute(
"UPDATE nodes SET body = ?1, modified_at = ?2, hlc = ?3 WHERE id = ?4",
(&new_body, now, hlc, &doc_id),
(&new_body, now, &hlc, &doc_id),
)?;
ops::record(
conn,
owner,
&hlc,
op_type::NODE_SET,
&doc_id,
json!({ "title": doc.title, "body": new_body }),
)?;
Ok(())
}

View file

@ -9,11 +9,13 @@
//! as free functions over a `&Connection`; the [`Store`] impl here is a thin
//! delegating layer so a transaction can span several of them.
mod apply;
mod exporter;
mod links;
mod log;
mod migrations;
mod nodes;
mod ops;
mod tasks;
pub use migrations::latest_version;
@ -26,7 +28,10 @@ use ulid::Ulid;
use crate::clock::Clock;
use crate::error::{Error, Result};
use crate::hlc::Hlc;
use crate::model::{Attention, Health, Link, LinkType, NewNode, NewTask, Node, Task, TaskState};
use crate::model::{
Attention, Conflict, Health, Link, LinkType, NewNode, NewTask, Node, Task, TaskState,
};
use crate::oplog::Op;
use crate::ranking::RankedTask;
use crate::store::Store;
@ -99,6 +104,20 @@ pub(super) fn meta_set(conn: &Connection, key: &str, value: &str) -> Result<()>
Ok(())
}
/// Advance the persisted clock past a remote `hlc` (encoded), so future local
/// stamps exceed everything we've seen (tech-spec §12). Encoded HLCs sort by
/// causal order, so a string compare suffices.
pub(super) fn absorb_remote_hlc(conn: &Connection, remote: &str) -> Result<()> {
let bump = match meta_get(conn, "last_hlc")? {
Some(last) => remote > last.as_str(),
None => true,
};
if bump {
meta_set(conn, "last_hlc", remote)?;
}
Ok(())
}
/// Ensure this store has a stable device `origin` id.
fn ensure_origin(conn: &Connection) -> Result<()> {
if meta_get(conn, "origin")?.is_none() {
@ -176,7 +195,7 @@ impl Store for LocalStore {
fn tombstone_node(&mut self, id: &str) -> Result<()> {
let now = self.clock.now_ms();
nodes::tombstone(&self.conn, now, id)
nodes::tombstone(&self.conn, &self.owner_id, now, id)
}
fn create_task(&mut self, input: NewTask) -> Result<Task> {
@ -195,12 +214,12 @@ impl Store for LocalStore {
fn skip_recurrence(&mut self, node_id: &str) -> Result<Task> {
let now = self.clock.now_ms();
tasks::skip(&self.conn, now, node_id)
tasks::skip(&self.conn, &self.owner_id, now, node_id)
}
fn set_task_attention(&mut self, node_id: &str, attention: Attention) -> Result<Task> {
let now = self.clock.now_ms();
tasks::set_attention(&self.conn, now, node_id, attention)
tasks::set_attention(&self.conn, &self.owner_id, now, node_id, attention)
}
fn next(&self, scope: Option<&str>, limit: usize) -> Result<Vec<RankedTask>> {
@ -232,7 +251,7 @@ impl Store for LocalStore {
fn add_link(&mut self, src_id: &str, dst_id: &str, link_type: LinkType) -> Result<Link> {
let now = self.clock.now_ms();
links::add(&self.conn, now, src_id, dst_id, link_type)
links::add(&self.conn, &self.owner_id, now, src_id, dst_id, link_type)
}
fn outgoing_links(&self, id: &str) -> Result<Vec<Link>> {
@ -258,6 +277,50 @@ impl Store for LocalStore {
fn export(&self, dir: &std::path::Path) -> Result<usize> {
exporter::export(&self.conn, &self.owner_id, dir)
}
fn ops_since(&self, after: Option<&str>) -> Result<Vec<Op>> {
ops::since(&self.conn, &self.owner_id, after)
}
fn apply_op(&mut self, op: &Op) -> Result<bool> {
apply::apply(&mut self.conn, op)
}
fn adopt_owner(&mut self, canonical: &str) -> Result<()> {
if self.owner_id == canonical {
return Ok(());
}
let old = self.owner_id.clone();
let tx = self.conn.transaction()?;
tx.execute(
"INSERT OR IGNORE INTO users (id, oidc_sub, name, created_at) VALUES (?1, NULL, 'user', 0)",
[canonical],
)?;
tx.execute(
"UPDATE nodes SET owner_id = ?1 WHERE owner_id = ?2",
(canonical, &old),
)?;
tx.execute(
"UPDATE oplog SET owner_id = ?1 WHERE owner_id = ?2",
(canonical, &old),
)?;
tx.execute(
"UPDATE conflicts SET owner_id = ?1 WHERE owner_id = ?2",
(canonical, &old),
)?;
tx.execute("DELETE FROM users WHERE id = ?1", [&old])?;
tx.commit()?;
self.owner_id = canonical.to_string();
Ok(())
}
fn conflicts_list(&self) -> Result<Vec<Conflict>> {
apply::list_conflicts(&self.conn, &self.owner_id)
}
fn conflicts_resolve(&mut self, id: &str, choice: &str) -> Result<()> {
apply::resolve_conflict(&self.conn, id, choice)
}
}
#[cfg(test)]

View file

@ -2,9 +2,22 @@
use rusqlite::{Connection, OptionalExtension, Row};
use super::{links, new_id, next_hlc};
use serde_json::json;
use super::{links, new_id, next_hlc, ops};
use crate::error::{Error, Result};
use crate::model::{deterministic_id, NewNode, Node, NodeKind};
use crate::oplog::op_type;
/// Op payload describing a node's identity/content for `node.create`.
fn create_payload(node: &Node) -> serde_json::Value {
json!({
"kind": node.kind.as_str(),
"title": node.title,
"body": node.body,
"created_at": node.created_at,
})
}
/// The `nodes` columns in a fixed order, shared by every SELECT here.
pub(super) const COLUMNS: &str =
@ -33,6 +46,19 @@ pub(super) fn build(
}
}
/// Record a `node.create` op for an already-inserted node (used when a caller
/// builds + inserts a node directly, e.g. `task.create`).
pub(super) fn record_create(conn: &Connection, owner: &str, node: &Node) -> Result<()> {
ops::record(
conn,
owner,
&node.hlc,
op_type::NODE_CREATE,
&node.id,
create_payload(node),
)
}
/// Insert a fully-formed [`Node`] row.
pub(super) fn insert(conn: &Connection, node: &Node) -> Result<()> {
conn.execute(
@ -69,11 +95,19 @@ pub(super) fn from_row(row: &Row) -> rusqlite::Result<Node> {
})
}
/// Create and persist a node.
/// Create and persist a node, recording a `node.create` op.
pub(super) fn create(conn: &Connection, owner: &str, now: i64, input: NewNode) -> Result<Node> {
let hlc = next_hlc(conn, now)?;
let node = build(owner, now, &hlc, input.kind, input.title, input.body);
insert(conn, &node)?;
ops::record(
conn,
owner,
&node.hlc,
op_type::NODE_CREATE,
&node.id,
create_payload(&node),
)?;
Ok(node)
}
@ -107,6 +141,14 @@ pub(super) fn open_or_create_journal(
tombstoned: false,
};
insert(conn, &node)?;
ops::record(
conn,
owner,
&node.hlc,
op_type::NODE_CREATE,
&node.id,
create_payload(&node),
)?;
Ok(node)
}
@ -170,6 +212,14 @@ pub(super) fn update(
&node.id,
),
)?;
ops::record(
&tx,
owner,
&node.hlc,
op_type::NODE_SET,
&node.id,
json!({ "title": node.title, "body": node.body }),
)?;
if body_changed {
links::sync_wiki_links(
&tx,
@ -211,25 +261,15 @@ pub(super) fn aliases(conn: &Connection, id: &str) -> Result<Vec<String>> {
/// Tombstone (soft-delete) a node. No hard deletes — tombstones keep merge
/// monotonic (tech-spec §4.3).
pub(super) fn tombstone(conn: &Connection, now: i64, id: &str) -> Result<()> {
pub(super) fn tombstone(conn: &Connection, owner: &str, now: i64, id: &str) -> Result<()> {
let hlc = next_hlc(conn, now)?;
let updated = conn.execute(
"UPDATE nodes SET tombstoned = 1, modified_at = ?1, hlc = ?2 WHERE id = ?3",
(now, hlc, id),
(now, &hlc, id),
)?;
if updated == 0 {
return Err(Error::NodeNotFound(id.to_string()));
}
Ok(())
}
/// Bump `modified_at`/`hlc` on a node (used when a task scalar field changes so
/// the node's modified time reflects the mutation for sync ordering).
pub(super) fn touch(conn: &Connection, now: i64, id: &str) -> Result<()> {
let hlc = next_hlc(conn, now)?;
conn.execute(
"UPDATE nodes SET modified_at = ?1, hlc = ?2 WHERE id = ?3",
(now, hlc, id),
)?;
ops::record(conn, owner, &hlc, op_type::NODE_TOMBSTONE, id, json!({}))?;
Ok(())
}

View file

@ -0,0 +1,97 @@
//! `oplog` table operations — recording local ops and reading them by HLC
//! cursor (tech-spec §12). The merge/apply path lives in [`super::apply`].
use rusqlite::{Connection, Row};
use serde_json::Value;
use super::new_id;
use crate::error::Result;
use crate::hlc::Hlc;
use crate::oplog::Op;
const COLUMNS: &str = "id, owner_id, hlc, origin, op_type, target_id, payload, applied";
/// Record a **local** op (already applied to the materialized tables in this
/// same transaction). The originating device is recovered from `hlc`.
pub(super) fn record(
conn: &Connection,
owner: &str,
hlc: &str,
op_type: &str,
target_id: &str,
payload: Value,
) -> Result<()> {
let origin = Hlc::parse(hlc)?.origin;
conn.execute(
"INSERT INTO oplog (id, owner_id, hlc, origin, op_type, target_id, payload, applied)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1)",
(
new_id(),
owner,
hlc,
origin,
op_type,
target_id,
payload.to_string(),
),
)?;
Ok(())
}
fn from_row(row: &Row) -> rusqlite::Result<Op> {
let payload_text: String = row.get("payload")?;
let payload: Value = serde_json::from_str(&payload_text).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
})?;
Ok(Op {
id: row.get("id")?,
owner_id: row.get("owner_id")?,
hlc: row.get("hlc")?,
origin: row.get("origin")?,
op_type: row.get("op_type")?,
target_id: row.get("target_id")?,
payload,
applied: row.get::<_, i64>("applied")? != 0,
})
}
/// Ops for `owner` with HLC strictly greater than `after` (None ⇒ all), in
/// causal (HLC) order — the push cursor for sync.
pub(super) fn since(conn: &Connection, owner: &str, after: Option<&str>) -> Result<Vec<Op>> {
let sql = format!(
"SELECT {COLUMNS} FROM oplog
WHERE owner_id = ?1 AND (?2 IS NULL OR hlc > ?2)
ORDER BY hlc"
);
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map((owner, after), from_row)?;
Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
}
/// Store a foreign [`Op`] verbatim (its own id/hlc/origin), marking whether it
/// was applied to the materialized tables.
pub(super) fn insert_op(conn: &Connection, op: &Op, applied: bool) -> Result<()> {
conn.execute(
"INSERT INTO oplog (id, owner_id, hlc, origin, op_type, target_id, payload, applied)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
(
&op.id,
&op.owner_id,
&op.hlc,
&op.origin,
&op.op_type,
&op.target_id,
op.payload.to_string(),
applied as i64,
),
)?;
Ok(())
}
/// Whether an op with this id has already been recorded (idempotent apply).
pub(super) fn exists(conn: &Connection, op_id: &str) -> Result<bool> {
let n: i64 = conn.query_row("SELECT COUNT(*) FROM oplog WHERE id = ?1", [op_id], |r| {
r.get(0)
})?;
Ok(n > 0)
}

View file

@ -5,12 +5,46 @@
use rusqlite::{Connection, OptionalExtension, Row};
use super::{links, log, next_hlc, nodes};
use serde_json::json;
use super::{links, log, next_hlc, nodes, ops};
use crate::error::{Error, Result};
use crate::model::{Attention, Health, LinkType, NewTask, NodeKind, Task, TaskState};
use crate::oplog::op_type;
use crate::ranking::{self, RankedTask};
use crate::recurrence;
/// JSON payload of a task's scalar fields (for `task.create`/`task.set` ops).
fn scalar_payload(t: &Task) -> serde_json::Value {
json!({
"attention": t.attention.map(|a| a.as_str()),
"do_date": t.do_date,
"late_on": t.late_on,
"state": t.state.as_str(),
"recurrence": t.recurrence,
})
}
/// Bump the task node's hlc/modified_at and record a `task.set` op snapshotting
/// the task's current scalars (LWW unit, tech-spec §12).
fn record_set(conn: &Connection, owner: &str, now: i64, node_id: &str) -> Result<()> {
let task = require(conn, node_id)?;
let hlc = next_hlc(conn, now)?;
conn.execute(
"UPDATE nodes SET modified_at = ?1, hlc = ?2 WHERE id = ?3",
(now, &hlc, node_id),
)?;
ops::record(
conn,
owner,
&hlc,
op_type::TASK_SET,
node_id,
scalar_payload(&task),
)?;
Ok(())
}
fn from_row(row: &Row) -> rusqlite::Result<Task> {
let attention = match row.get::<_, Option<String>>("attention")? {
Some(s) => Some(
@ -57,6 +91,7 @@ pub(super) fn create(conn: &mut Connection, owner: &str, now: i64, input: NewTas
None,
);
nodes::insert(&tx, &task_node)?;
nodes::record_create(&tx, owner, &task_node)?;
tx.execute(
"INSERT INTO tasks (node_id, attention, do_date, late_on, state, recurrence)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
@ -69,6 +104,19 @@ pub(super) fn create(conn: &mut Connection, owner: &str, now: i64, input: NewTas
&task.recurrence,
),
)?;
let task = Task {
node_id: task_node.id.clone(),
..task
};
let task_create_hlc = next_hlc(&tx, now)?;
ops::record(
&tx,
owner,
&task_create_hlc,
op_type::TASK_CREATE,
&task.node_id,
scalar_payload(&task),
)?;
// The canonical context doc (the task's jumping-off point / checklist body).
let doc_hlc = next_hlc(&tx, now)?;
@ -81,18 +129,30 @@ pub(super) fn create(conn: &mut Connection, owner: &str, now: i64, input: NewTas
Some(String::new()),
);
nodes::insert(&tx, &doc)?;
links::add(&tx, now, &task_node.id, &doc.id, LinkType::CanonicalContext)?;
nodes::record_create(&tx, owner, &doc)?;
links::add(
&tx,
owner,
now,
&task.node_id,
&doc.id,
LinkType::CanonicalContext,
)?;
if let Some(project_id) = &input.project_id {
links::add(&tx, now, &task_node.id, project_id, LinkType::InProject)?;
links::add(
&tx,
owner,
now,
&task.node_id,
project_id,
LinkType::InProject,
)?;
}
tx.commit()?;
Ok(Task {
node_id: task_node.id,
..task
})
Ok(task)
}
/// Fetch a task by node id.
@ -128,7 +188,7 @@ pub(super) fn set_state(
"UPDATE tasks SET state = ?1 WHERE node_id = ?2",
(state.as_str(), node_id),
)?;
nodes::touch(conn, now, node_id)?;
record_set(conn, owner, now, node_id)?;
require(conn, node_id)
}
@ -168,7 +228,7 @@ fn roll_forward(conn: &mut Connection, owner: &str, now: i64, task: &Task) -> Re
// 3. Advance the do-date (or finally finish a finite series).
advance(&tx, now, &task.node_id, rrule, task.do_date)?;
nodes::touch(&tx, now, &task.node_id)?;
record_set(&tx, owner, now, &task.node_id)?;
tx.commit()?;
require(conn, &task.node_id)
@ -203,14 +263,14 @@ fn advance(
/// Skip the current occurrence of a recurring task: advance the do-date the same
/// way as completion but **without** logging a completion (tech-spec §4.4).
pub(super) fn skip(conn: &Connection, now: i64, node_id: &str) -> Result<Task> {
pub(super) fn skip(conn: &Connection, owner: &str, now: i64, node_id: &str) -> Result<Task> {
let task = require(conn, node_id)?;
let rrule = task
.recurrence
.as_deref()
.ok_or_else(|| Error::Integrity(format!("skip on non-recurring task {node_id}")))?;
advance(conn, now, node_id, rrule, task.do_date)?;
nodes::touch(conn, now, node_id)?;
record_set(conn, owner, now, node_id)?;
require(conn, node_id)
}
@ -348,6 +408,7 @@ fn load_candidates(conn: &Connection, owner: &str) -> Result<Vec<RankedTask>> {
/// Set a task's attention-state.
pub(super) fn set_attention(
conn: &Connection,
owner: &str,
now: i64,
node_id: &str,
attention: Attention,
@ -359,6 +420,6 @@ pub(super) fn set_attention(
if updated == 0 {
return Err(Error::NodeNotFound(node_id.to_string()));
}
nodes::touch(conn, now, node_id)?;
record_set(conn, owner, now, node_id)?;
require(conn, node_id)
}

View file

@ -5,7 +5,10 @@
//! `RemoteStore`) is configuration. This trait is the seam.
use crate::error::Result;
use crate::model::{Attention, Health, Link, LinkType, NewNode, NewTask, Node, Task, TaskState};
use crate::model::{
Attention, Conflict, Health, Link, LinkType, NewNode, NewTask, Node, Task, TaskState,
};
use crate::oplog::Op;
use crate::ranking::RankedTask;
/// A backend that can store and retrieve nodes, tasks, and links.
@ -106,4 +109,27 @@ pub trait Store {
/// Export every non-tombstoned node to a `.md` directory tree under `dir`,
/// returning the count written (tech-spec §5). One-way; no import.
fn export(&self, dir: &std::path::Path) -> Result<usize>;
// --- sync (op-log) ---
/// Ops for this owner with HLC strictly greater than `after` (None ⇒ all),
/// in causal order — the push cursor for sync (tech-spec §12).
fn ops_since(&self, after: Option<&str>) -> Result<Vec<Op>>;
/// Apply a foreign op with the merge rules (LWW / OR-set / tombstone),
/// idempotently. Returns `true` if newly applied. Ops should be applied in
/// HLC order (tech-spec §12).
fn apply_op(&mut self, op: &Op) -> Result<bool>;
/// Rewrite this replica's `owner_id` to a canonical user id — the one-time,
/// pre-first-sync adoption of tech-spec §13. After this all data is owned by
/// `canonical`; replicas that adopt the same id can sync. (Full adoption,
/// incl. owner-embedded deterministic ids, is refined with auth.)
fn adopt_owner(&mut self, canonical: &str) -> Result<()>;
/// Open merge conflicts surfaced for the user (`heph conflicts`).
fn conflicts_list(&self) -> Result<Vec<Conflict>>;
/// Settle a conflict by the user's choice (`"local"`/`"remote"`).
fn conflicts_resolve(&mut self, id: &str, choice: &str) -> Result<()>;
}

View file

@ -0,0 +1,207 @@
//! Op-log + merge convergence (tech-spec §12, slice 8b/8c). Two local replicas
//! (distinct origins) exchange ops; we assert they converge and that ambiguous
//! merges surface as conflicts. Body merge here is LWW; the yrs text CRDT lands
//! in a follow-up. No network yet — we hand ops across directly.
use heph_core::{Attention, Clock, LocalStore, NewNode, NewTask, Op, Store, TaskState};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
#[derive(Clone)]
struct StepClock(Arc<AtomicI64>);
impl StepClock {
fn new(ms: i64) -> Self {
StepClock(Arc::new(AtomicI64::new(ms)))
}
fn set(&self, ms: i64) {
self.0.store(ms, Ordering::SeqCst);
}
}
impl Clock for StepClock {
fn now_ms(&self) -> i64 {
self.0.load(Ordering::SeqCst)
}
}
/// The shared canonical user both replicas adopt — the §13 model where every
/// device of one human carries the same owner id.
const OWNER: &str = "canonical-user";
fn replica(now: i64) -> (LocalStore, StepClock) {
let c = StepClock::new(now);
let mut s = LocalStore::open_in_memory(Box::new(c.clone())).unwrap();
s.adopt_owner(OWNER).unwrap();
(s, c)
}
/// Push every op from `src` (after `cursor`) into `dst`, in HLC order.
fn sync_one_way(src: &dyn Store, dst: &mut dyn Store, cursor: Option<&str>) -> Option<String> {
let ops: Vec<Op> = src.ops_since(cursor).unwrap();
let mut last = cursor.map(str::to_string);
for op in &ops {
dst.apply_op(op).unwrap();
last = Some(op.hlc.clone());
}
last
}
#[test]
fn online_round_trip_propagates_a_node() {
let (mut a, _ca) = replica(1000);
let (mut b, _cb) = replica(1000);
let n = a
.create_node(NewNode::doc("Roof", "shingles need work"))
.unwrap();
sync_one_way(&a, &mut b, None);
let on_b = b.get_node(&n.id).unwrap().expect("node reached B");
assert_eq!(on_b.title, "Roof");
assert_eq!(on_b.body.as_deref(), Some("shingles need work"));
}
#[test]
fn apply_is_idempotent() {
let (mut a, _ca) = replica(1000);
let (mut b, _cb) = replica(1000);
let n = a.create_node(NewNode::doc("X", "y")).unwrap();
// Apply all of A's ops to B twice — second pass is a no-op.
sync_one_way(&a, &mut b, None);
let again: Vec<Op> = a.ops_since(None).unwrap();
for op in &again {
assert!(
!b.apply_op(op).unwrap(),
"re-applying {} mutated B",
op.op_type
);
}
assert_eq!(b.get_node(&n.id).unwrap().unwrap().title, "X");
}
#[test]
fn offline_divergent_scalar_edits_converge_with_a_conflict() {
// A creates a task; B learns it. Then both go offline and set a different
// do_date. After exchanging, both converge to the higher-HLC value and each
// records a conflict for the discarded value.
let (mut a, ca) = replica(1000);
let (mut b, cb) = replica(1000);
let task = a
.create_task(NewTask {
title: "Renew passport".into(),
attention: Some(Attention::Orange),
..Default::default()
})
.unwrap();
sync_one_way(&a, &mut b, None);
// Divergent offline edits. B's edit is later (higher physical time) → wins.
ca.set(2000);
a.set_task_state(&task.node_id, TaskState::Done).unwrap();
cb.set(3000);
b.set_task_attention(&task.node_id, Attention::Red).unwrap();
// Give each a distinct do_date too (the conflicting field).
// (set_task_* above already produced task.set ops snapshotting scalars.)
// Exchange the divergent ops.
sync_one_way(&a, &mut b, None);
sync_one_way(&b, &mut a, None);
// Both replicas converge to identical task scalars.
let ta = a.get_task(&task.node_id).unwrap().unwrap();
let tb = b.get_task(&task.node_id).unwrap().unwrap();
assert_eq!(ta, tb, "replicas did not converge: {ta:?} vs {tb:?}");
// B wrote last (t=3000) → its attention=red wins on both.
assert_eq!(ta.attention, Some(Attention::Red));
// Each replica surfaced the divergence as a conflict (not silently merged).
assert!(
!a.conflicts_list().unwrap().is_empty(),
"A recorded no conflict"
);
assert!(
!b.conflicts_list().unwrap().is_empty(),
"B recorded no conflict"
);
}
#[test]
fn concurrent_body_edits_converge_lww() {
let (mut a, ca) = replica(1000);
let (mut b, cb) = replica(1000);
let n = a.create_node(NewNode::doc("Note", "base")).unwrap();
sync_one_way(&a, &mut b, None);
ca.set(2000);
a.update_node(&n.id, None, Some("A's edit".into())).unwrap();
cb.set(2500);
b.update_node(&n.id, None, Some("B's edit".into())).unwrap();
sync_one_way(&a, &mut b, None);
sync_one_way(&b, &mut a, None);
let ba = a.get_node(&n.id).unwrap().unwrap().body;
let bb = b.get_node(&n.id).unwrap().unwrap().body;
assert_eq!(ba, bb, "bodies did not converge");
assert_eq!(ba.as_deref(), Some("B's edit")); // later HLC wins
}
#[test]
fn links_are_an_or_set() {
// A and B each add a different link to the same node; both survive.
let (mut a, ca) = replica(1000);
let (mut b, cb) = replica(1000);
let src = a.create_node(NewNode::doc("src", "")).unwrap();
let d1 = a.create_node(NewNode::doc("d1", "")).unwrap();
sync_one_way(&a, &mut b, None);
let d2 = b.create_node(NewNode::doc("d2", "")).unwrap();
sync_one_way(&b, &mut a, None);
ca.set(2000);
a.add_link(&src.id, &d1.id, heph_core::LinkType::Blocks)
.unwrap();
cb.set(2000);
b.add_link(&src.id, &d2.id, heph_core::LinkType::Blocks)
.unwrap();
sync_one_way(&a, &mut b, None);
sync_one_way(&b, &mut a, None);
let dsts_a: Vec<String> = a
.outgoing_links(&src.id)
.unwrap()
.into_iter()
.filter(|l| l.link_type == heph_core::LinkType::Blocks)
.map(|l| l.dst_id)
.collect();
let mut dsts_b: Vec<String> = b
.outgoing_links(&src.id)
.unwrap()
.into_iter()
.filter(|l| l.link_type == heph_core::LinkType::Blocks)
.map(|l| l.dst_id)
.collect();
let mut sorted_a = dsts_a.clone();
sorted_a.sort();
dsts_b.sort();
assert_eq!(sorted_a, dsts_b, "link sets did not converge");
assert!(dsts_b.contains(&d1.id) && dsts_b.contains(&d2.id));
}
#[test]
fn tombstones_propagate_and_are_monotonic() {
let (mut a, _ca) = replica(1000);
let (mut b, _cb) = replica(1000);
let n = a.create_node(NewNode::doc("doomed", "")).unwrap();
sync_one_way(&a, &mut b, None);
a.tombstone_node(&n.id).unwrap();
sync_one_way(&a, &mut b, None);
assert!(b.get_node(&n.id).unwrap().unwrap().tombstoned);
// Tombstoned nodes drop out of search/next on B.
assert!(b.search("doomed").unwrap().is_empty());
}