Phase 1: v1 prototype #1

Merged
eblume merged 91 commits from feature/v1-prototype into main 2026-06-03 20:48:23 -07:00
9 changed files with 416 additions and 20 deletions
Showing only changes of commit c81d45a291 - Show all commits

heph-core: real HLC + persistent device origin (sync 8a)
Some checks failed
Build / validate (pull_request) Failing after 4s

First sync slice. Ratifies yrs for body merge in the tech-spec.

- hlc module: Hlc (physical, counter, origin) with a fixed-width encoding
  whose lexical order equals causal order; HlcClock generator (tick/update)
  — clock-injected, strictly monotonic. Unit + 2 proptests.
- meta table (migration v3) holds the stable per-device `origin` and the
  last HLC. next_hlc() does a read-modify-write inside the caller's
  transaction (store is single-writer, so no race), replacing the
  timestamp placeholder. Every node write is now stamped with a real,
  monotonic, causally-ordered HLC.

4 stamping integration tests (monotonic under stalled/regressed clock;
origin shared + persists across reopen; HLC resumes from persisted state).
89 tests green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Erich Blume 2026-05-31 21:13:55 -07:00

222
crates/heph-core/src/hlc.rs Normal file
View file

@ -0,0 +1,222 @@
//! Hybrid Logical Clock (tech-spec §12).
//!
//! Every operation is stamped with an [`Hlc`] so concurrent edits across
//! offline devices have a deterministic causal order. An HLC blends physical
//! wall-clock ms (from the injected clock — never read ambiently here) with a
//! logical counter that breaks ties when physical time doesn't advance, plus
//! the originating device id as a final tiebreak.
//!
//! The encoded form is fixed-width so lexical string order equals causal order
//! — that's what the `hlc` TEXT columns and op-log cursor rely on.
use crate::error::{Error, Result};
/// A hybrid logical clock timestamp.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Hlc {
/// Physical time, epoch ms.
pub physical: i64,
/// Logical counter (ties when physical time repeats).
pub counter: u32,
/// Originating device id (final tiebreak; keeps the order total).
pub origin: String,
}
impl Hlc {
/// The zero clock for `origin` (precedes every real event).
pub fn zero(origin: impl Into<String>) -> Hlc {
Hlc {
physical: 0,
counter: 0,
origin: origin.into(),
}
}
/// Encode to a fixed-width, lexically-sortable string
/// `physical(20):counter(10):origin`.
pub fn encode(&self) -> String {
format!("{:020}:{:010}:{}", self.physical, self.counter, self.origin)
}
/// Parse the [`encode`](Hlc::encode) form. Zero-padding parses fine.
pub fn parse(s: &str) -> Result<Hlc> {
let mut parts = s.splitn(3, ':');
let physical: i64 = parts
.next()
.and_then(|p| p.parse().ok())
.ok_or_else(|| Error::Integrity(format!("bad hlc physical in {s:?}")))?;
let counter: u32 = parts
.next()
.and_then(|p| p.parse().ok())
.ok_or_else(|| Error::Integrity(format!("bad hlc counter in {s:?}")))?;
let origin = parts
.next()
.ok_or_else(|| Error::Integrity(format!("missing hlc origin in {s:?}")))?
.to_string();
Ok(Hlc {
physical,
counter,
origin,
})
}
}
/// A per-device HLC generator. Holds the device `origin` and the last emitted
/// clock; deterministic given the injected physical time.
#[derive(Clone, Debug)]
pub struct HlcClock {
origin: String,
last: Hlc,
}
impl HlcClock {
/// Start a generator for `origin`, resuming from `last` (use [`Hlc::zero`]
/// for a fresh device).
pub fn new(origin: impl Into<String>, last: Hlc) -> HlcClock {
HlcClock {
origin: origin.into(),
last,
}
}
/// This device's id.
pub fn origin(&self) -> &str {
&self.origin
}
/// The last clock this generator emitted.
pub fn last(&self) -> &Hlc {
&self.last
}
/// Stamp a **local** event at physical time `now_ms`. Strictly greater than
/// every clock this generator has emitted.
pub fn tick(&mut self, now_ms: i64) -> Hlc {
let physical = now_ms.max(self.last.physical);
let counter = if physical == self.last.physical {
self.last.counter + 1
} else {
0
};
let hlc = Hlc {
physical,
counter,
origin: self.origin.clone(),
};
self.last = hlc.clone();
hlc
}
/// Stamp the **receipt** of a remote event `remote` at physical time
/// `now_ms`, advancing past both our last clock and the remote's.
pub fn update(&mut self, remote: &Hlc, now_ms: i64) -> Hlc {
let physical = now_ms.max(self.last.physical).max(remote.physical);
let counter = if physical == self.last.physical && physical == remote.physical {
self.last.counter.max(remote.counter) + 1
} else if physical == self.last.physical {
self.last.counter + 1
} else if physical == remote.physical {
remote.counter + 1
} else {
0
};
let hlc = Hlc {
physical,
counter,
origin: self.origin.clone(),
};
self.last = hlc.clone();
hlc
}
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
#[test]
fn encode_is_lexically_sortable_by_causal_order() {
let a = Hlc {
physical: 5,
counter: 9,
origin: "z".into(),
};
let b = Hlc {
physical: 5,
counter: 10,
origin: "a".into(),
};
// b is causally after a (higher counter) despite a smaller origin.
assert!(b > a);
assert!(b.encode() > a.encode());
}
#[test]
fn encode_parse_round_trips() {
let h = Hlc {
physical: 1_700_000_000_000,
counter: 42,
origin: "dev-1".into(),
};
assert_eq!(Hlc::parse(&h.encode()).unwrap(), h);
// Zero parses back too.
let z = Hlc::zero("dev-1");
assert_eq!(Hlc::parse(&z.encode()).unwrap(), z);
}
#[test]
fn tick_is_strictly_monotonic_even_when_clock_stalls() {
let mut c = HlcClock::new("d", Hlc::zero("d"));
let a = c.tick(100);
let b = c.tick(100); // same physical → counter bumps
let d = c.tick(50); // clock went backwards → still advances via physical=max
assert!(b > a);
assert!(d > b);
assert_eq!(a.physical, 100);
assert_eq!(b.counter, a.counter + 1);
}
#[test]
fn update_absorbs_a_remote_clock() {
let mut c = HlcClock::new("local", Hlc::zero("local"));
c.tick(10);
let remote = Hlc {
physical: 1000,
counter: 3,
origin: "other".into(),
};
let got = c.update(&remote, 20);
assert!(got > remote);
assert_eq!(got.physical, 1000);
assert_eq!(got.counter, 4);
assert_eq!(got.origin, "local");
}
proptest! {
/// A run of ticks is strictly increasing for any physical-time sequence.
#[test]
fn ticks_strictly_increase(times in proptest::collection::vec(0i64..1_000_000, 1..50)) {
let mut c = HlcClock::new("d", Hlc::zero("d"));
let mut prev: Option<Hlc> = None;
for t in times {
let h = c.tick(t);
if let Some(p) = &prev {
prop_assert!(h > *p);
}
prev = Some(h);
}
}
/// Encoding preserves order for any two clocks.
#[test]
fn encode_preserves_order(
p1 in 0i64..1_000_000_000_000, c1 in 0u32..100_000, o1 in "[a-z]{1,4}",
p2 in 0i64..1_000_000_000_000, c2 in 0u32..100_000, o2 in "[a-z]{1,4}",
) {
let a = Hlc { physical: p1, counter: c1, origin: o1 };
let b = Hlc { physical: p2, counter: c2, origin: o2 };
prop_assert_eq!(a.cmp(&b), a.encode().cmp(&b.encode()));
}
}
}

View file

@ -12,6 +12,7 @@ pub mod clock;
pub mod error;
pub mod export;
pub mod extract;
pub mod hlc;
pub mod model;
pub mod ranking;
pub mod recurrence;
@ -22,6 +23,7 @@ pub use clock::{Clock, FixedClock};
pub use error::{Error, Result};
pub use export::{render as render_export, ExportFile, NodeExport};
pub use extract::{extract, ContextItem, Extraction};
pub use hlc::{Hlc, HlcClock};
pub use model::{
deterministic_id, Attention, Health, Link, LinkType, NewNode, NewTask, Node, NodeKind, Task,
TaskState,

View file

@ -9,7 +9,7 @@
use rusqlite::Connection;
use super::{hlc_for, links, nodes};
use super::{links, next_hlc, nodes};
use crate::error::{Error, Result};
use crate::model::{LinkType, NodeKind};
@ -25,9 +25,11 @@ pub(super) fn ensure_doc(
}
let task =
nodes::get(conn, task_id)?.ok_or_else(|| Error::NodeNotFound(task_id.to_string()))?;
let hlc = next_hlc(conn, now)?;
let doc = nodes::build(
owner,
now,
&hlc,
NodeKind::Doc,
format!("{} — log", task.title),
Some(String::new()),
@ -48,9 +50,10 @@ pub(super) fn append(
let doc_id = ensure_doc(conn, owner, now, task_id)?;
let doc = nodes::get(conn, &doc_id)?.ok_or_else(|| Error::NodeNotFound(doc_id.clone()))?;
let new_body = append_line(doc.body.as_deref().unwrap_or(""), text);
let hlc = next_hlc(conn, now)?;
conn.execute(
"UPDATE nodes SET body = ?1, modified_at = ?2, hlc = ?3 WHERE id = ?4",
(&new_body, now, hlc_for(now), &doc_id),
(&new_body, now, hlc, &doc_id),
)?;
Ok(())
}

View file

@ -10,7 +10,11 @@ use rusqlite::Connection;
/// The ordered list of migrations. Never reorder or mutate a shipped entry —
/// only append.
const MIGRATIONS: &[(i64, &str)] = &[(1, MIGRATION_0001), (2, MIGRATION_0002)];
const MIGRATIONS: &[(i64, &str)] = &[
(1, MIGRATION_0001),
(2, MIGRATION_0002),
(3, MIGRATION_0003),
];
/// v1 — the base node graph, identity, and sync scaffolding (tech-spec §4.5).
const MIGRATION_0001: &str = r#"
@ -115,6 +119,15 @@ CREATE TRIGGER nodes_au AFTER UPDATE ON nodes BEGIN
END;
"#;
/// v3 — per-store key/value metadata (e.g. this device's stable `origin` id for
/// HLC stamping and sync, tech-spec §12).
const MIGRATION_0003: &str = r#"
CREATE TABLE meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
"#;
/// Apply all pending migrations to `conn`.
pub fn migrate(conn: &Connection) -> Result<()> {
let current: i64 = conn.query_row("PRAGMA user_version", [], |r| r.get(0))?;

View file

@ -24,7 +24,8 @@ use rusqlite::{Connection, OptionalExtension};
use ulid::Ulid;
use crate::clock::Clock;
use crate::error::Result;
use crate::error::{Error, Result};
use crate::hlc::Hlc;
use crate::model::{Attention, Health, Link, LinkType, NewNode, NewTask, Node, Task, TaskState};
use crate::ranking::RankedTask;
use crate::store::Store;
@ -52,6 +53,7 @@ impl LocalStore {
fn init(conn: Connection, clock: Box<dyn Clock>) -> Result<Self> {
conn.execute_batch("PRAGMA foreign_keys = ON;")?;
migrations::migrate(&conn)?;
ensure_origin(&conn)?;
let owner_id = ensure_local_user(&conn, clock.as_ref())?;
Ok(LocalStore {
conn,
@ -67,6 +69,12 @@ impl LocalStore {
pub fn owner_id(&self) -> &str {
&self.owner_id
}
/// This device's stable `origin` id (HLC/sync identity).
pub fn origin(&self) -> Result<String> {
meta_get(&self.conn, "origin")?
.ok_or_else(|| Error::Integrity("missing device origin".into()))
}
}
/// A fresh ULID, as a string id.
@ -74,10 +82,56 @@ pub(crate) fn new_id() -> String {
Ulid::new().to_string()
}
/// Placeholder HLC string until the real hybrid logical clock lands (§12).
/// Zero-padded epoch ms keeps it lexically sortable in the meantime.
pub(crate) fn hlc_for(now_ms: i64) -> String {
format!("{now_ms:016}")
/// Read a `meta` value.
pub(super) fn meta_get(conn: &Connection, key: &str) -> Result<Option<String>> {
Ok(conn
.query_row("SELECT value FROM meta WHERE key = ?1", [key], |r| r.get(0))
.optional()?)
}
/// Upsert a `meta` value.
pub(super) fn meta_set(conn: &Connection, key: &str, value: &str) -> Result<()> {
conn.execute(
"INSERT INTO meta (key, value) VALUES (?1, ?2)
ON CONFLICT(key) DO UPDATE SET value = excluded.value",
(key, value),
)?;
Ok(())
}
/// Ensure this store has a stable device `origin` id.
fn ensure_origin(conn: &Connection) -> Result<()> {
if meta_get(conn, "origin")?.is_none() {
meta_set(conn, "origin", &new_id())?;
}
Ok(())
}
/// Generate the next local [`Hlc`] (encoded), advancing the persisted clock in
/// `meta`. Strictly greater than every previously-generated stamp; the
/// read-modify-write runs inside the caller's connection/transaction and the
/// store is single-writer, so there is no race.
pub(super) fn next_hlc(conn: &Connection, now_ms: i64) -> Result<String> {
let origin = meta_get(conn, "origin")?
.ok_or_else(|| Error::Integrity("missing device origin".into()))?;
let last = match meta_get(conn, "last_hlc")? {
Some(s) => Hlc::parse(&s)?,
None => Hlc::zero(&origin),
};
let physical = now_ms.max(last.physical);
let counter = if physical == last.physical {
last.counter + 1
} else {
0
};
let encoded = Hlc {
physical,
counter,
origin,
}
.encode();
meta_set(conn, "last_hlc", &encoded)?;
Ok(encoded)
}
/// Ensure a single local user exists, returning its id.

View file

@ -2,7 +2,7 @@
use rusqlite::{Connection, OptionalExtension, Row};
use super::{hlc_for, links, new_id};
use super::{links, new_id, next_hlc};
use crate::error::{Error, Result};
use crate::model::{deterministic_id, NewNode, Node, NodeKind};
@ -10,10 +10,12 @@ use crate::model::{deterministic_id, NewNode, Node, NodeKind};
pub(super) const COLUMNS: &str =
"id, owner_id, kind, title, body, created_at, modified_at, hlc, tombstoned";
/// Build an in-memory [`Node`] (not yet persisted).
/// Build an in-memory [`Node`] (not yet persisted) stamped with `hlc`. The
/// caller generates the HLC (it needs the connection) and passes it in.
pub(super) fn build(
owner: &str,
now: i64,
hlc: &str,
kind: NodeKind,
title: String,
body: Option<String>,
@ -26,7 +28,7 @@ pub(super) fn build(
body,
created_at: now,
modified_at: now,
hlc: hlc_for(now),
hlc: hlc.to_string(),
tombstoned: false,
}
}
@ -69,7 +71,8 @@ pub(super) fn from_row(row: &Row) -> rusqlite::Result<Node> {
/// Create and persist a node.
pub(super) fn create(conn: &Connection, owner: &str, now: i64, input: NewNode) -> Result<Node> {
let node = build(owner, now, input.kind, input.title, input.body);
let hlc = next_hlc(conn, now)?;
let node = build(owner, now, &hlc, input.kind, input.title, input.body);
insert(conn, &node)?;
Ok(node)
}
@ -100,7 +103,7 @@ pub(super) fn open_or_create_journal(
body: Some(String::new()),
created_at: now,
modified_at: now,
hlc: hlc_for(now),
hlc: next_hlc(conn, now)?,
tombstoned: false,
};
insert(conn, &node)?;
@ -154,9 +157,9 @@ pub(super) fn update(
None => false,
};
node.modified_at = now;
node.hlc = hlc_for(now);
let tx = conn.transaction()?;
node.hlc = next_hlc(&tx, now)?;
tx.execute(
"UPDATE nodes SET title = ?1, body = ?2, modified_at = ?3, hlc = ?4 WHERE id = ?5",
(
@ -209,9 +212,10 @@ pub(super) fn aliases(conn: &Connection, id: &str) -> Result<Vec<String>> {
/// Tombstone (soft-delete) a node. No hard deletes — tombstones keep merge
/// monotonic (tech-spec §4.3).
pub(super) fn tombstone(conn: &Connection, now: i64, id: &str) -> Result<()> {
let hlc = next_hlc(conn, now)?;
let updated = conn.execute(
"UPDATE nodes SET tombstoned = 1, modified_at = ?1, hlc = ?2 WHERE id = ?3",
(now, hlc_for(now), id),
(now, hlc, id),
)?;
if updated == 0 {
return Err(Error::NodeNotFound(id.to_string()));
@ -222,9 +226,10 @@ pub(super) fn tombstone(conn: &Connection, now: i64, id: &str) -> Result<()> {
/// Bump `modified_at`/`hlc` on a node (used when a task scalar field changes so
/// the node's modified time reflects the mutation for sync ordering).
pub(super) fn touch(conn: &Connection, now: i64, id: &str) -> Result<()> {
let hlc = next_hlc(conn, now)?;
conn.execute(
"UPDATE nodes SET modified_at = ?1, hlc = ?2 WHERE id = ?3",
(now, hlc_for(now), id),
(now, hlc, id),
)?;
Ok(())
}

View file

@ -5,7 +5,7 @@
use rusqlite::{Connection, OptionalExtension, Row};
use super::{hlc_for, links, log, nodes};
use super::{links, log, next_hlc, nodes};
use crate::error::{Error, Result};
use crate::model::{Attention, Health, LinkType, NewTask, NodeKind, Task, TaskState};
use crate::ranking::{self, RankedTask};
@ -47,7 +47,15 @@ pub(super) fn create(conn: &mut Connection, owner: &str, now: i64, input: NewTas
let tx = conn.transaction()?;
let task_node = nodes::build(owner, now, NodeKind::Task, input.title.clone(), None);
let task_hlc = next_hlc(&tx, now)?;
let task_node = nodes::build(
owner,
now,
&task_hlc,
NodeKind::Task,
input.title.clone(),
None,
);
nodes::insert(&tx, &task_node)?;
tx.execute(
"INSERT INTO tasks (node_id, attention, do_date, late_on, state, recurrence)
@ -63,9 +71,11 @@ pub(super) fn create(conn: &mut Connection, owner: &str, now: i64, input: NewTas
)?;
// The canonical context doc (the task's jumping-off point / checklist body).
let doc_hlc = next_hlc(&tx, now)?;
let doc = nodes::build(
owner,
now,
&doc_hlc,
NodeKind::Doc,
input.title.clone(),
Some(String::new()),
@ -139,9 +149,10 @@ fn roll_forward(conn: &mut Connection, owner: &str, now: i64, task: &Task) -> Re
let body = doc.body.unwrap_or_default();
let reset = recurrence::reset_checkboxes(&body);
if reset != body {
let hlc = next_hlc(&tx, now)?;
tx.execute(
"UPDATE nodes SET body = ?1, modified_at = ?2, hlc = ?3 WHERE id = ?4",
(&reset, now, hlc_for(now), &doc_id),
(&reset, now, hlc, &doc_id),
)?;
links::sync_wiki_links(&tx, owner, &doc_id, &reset, now)?;
}

View file

@ -0,0 +1,86 @@
//! The store stamps every node write with a real, monotonic HLC, and the
//! device origin persists across reopens (tech-spec §12, slice 8a).
use heph_core::{Clock, Hlc, LocalStore, NewNode, Store};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
#[derive(Clone)]
struct StepClock(Arc<AtomicI64>);
impl StepClock {
fn new(ms: i64) -> Self {
StepClock(Arc::new(AtomicI64::new(ms)))
}
fn set(&self, ms: i64) {
self.0.store(ms, Ordering::SeqCst);
}
}
impl Clock for StepClock {
fn now_ms(&self) -> i64 {
self.0.load(Ordering::SeqCst)
}
}
#[test]
fn node_hlcs_strictly_increase_even_when_the_clock_stalls_or_regresses() {
let clock = StepClock::new(1000);
let mut s = LocalStore::open_in_memory(Box::new(clock.clone())).unwrap();
let a = s.create_node(NewNode::doc("a", "")).unwrap();
// Same wall-clock instant → the logical counter must still advance.
let b = s.create_node(NewNode::doc("b", "")).unwrap();
// Clock regresses → HLC must not go backwards.
clock.set(500);
let c = s.create_node(NewNode::doc("c", "")).unwrap();
let ha = Hlc::parse(&a.hlc).unwrap();
let hb = Hlc::parse(&b.hlc).unwrap();
let hc = Hlc::parse(&c.hlc).unwrap();
assert!(hb > ha, "{hb:?} !> {ha:?}");
assert!(hc > hb, "{hc:?} !> {hb:?}");
// Lexical order of the encoded strings matches causal order.
assert!(a.hlc < b.hlc && b.hlc < c.hlc);
}
#[test]
fn all_hlcs_share_this_devices_origin() {
let mut s = LocalStore::open_in_memory(Box::new(StepClock::new(0))).unwrap();
let origin = s.origin().unwrap();
let n = s.create_node(NewNode::doc("x", "")).unwrap();
assert_eq!(Hlc::parse(&n.hlc).unwrap().origin, origin);
}
#[test]
fn origin_persists_across_reopen() {
let dir = tempfile::tempdir().unwrap();
let db = dir.path().join("heph.db");
let origin1 = {
let s = LocalStore::open(&db, Box::new(StepClock::new(1))).unwrap();
s.origin().unwrap()
};
let origin2 = {
let s = LocalStore::open(&db, Box::new(StepClock::new(2))).unwrap();
s.origin().unwrap()
};
assert_eq!(origin1, origin2);
assert!(!origin1.is_empty());
}
#[test]
fn hlc_keeps_advancing_after_reopen() {
let dir = tempfile::tempdir().unwrap();
let db = dir.path().join("heph.db");
let first_hlc = {
let mut s = LocalStore::open(&db, Box::new(StepClock::new(1000))).unwrap();
s.create_node(NewNode::doc("a", "")).unwrap().hlc
};
// Reopen with an earlier wall clock; the next stamp must still exceed the
// persisted one (the clock resumes from the stored last_hlc).
let next_hlc = {
let mut s = LocalStore::open(&db, Box::new(StepClock::new(500))).unwrap();
s.create_node(NewNode::doc("b", "")).unwrap().hlc
};
assert!(next_hlc > first_hlc, "{next_hlc} !> {first_hlc}");
}

View file

@ -272,7 +272,7 @@ All layers are required; CI runs them on every push/PR (extend `.forgejo/scripts
**Added for v1 client/server + auth (some to confirm at kickoff):**
- **Text CRDT (body merge):** `yrs` (Rust Yjs) — *leaning*; alternative `automerge`. Used for `doc`/`journal`/log bodies. Structured fields use a bespoke op-log + HLC (no library needed).
- **Text CRDT (body merge):** **`yrs` (Rust Yjs)** (ratified at the Phase 1 sync kickoff, 2026-05-31; `automerge` was the alternative). Used for `doc`/`journal`/log bodies. Structured fields use a bespoke op-log + HLC (no library needed).
- **HLC:** small bespoke hybrid-logical-clock (or a crate) — deterministic, clock-injected.
- **Hub network transport:** `axum` (HTTP/JSON) for the sync endpoint — *leaning* (reuses the eventual web-UI server); `reqwest` on the client side.
- **OIDC:** `openidconnect` crate for the Authentik device-code flow; tokens cached in the OS keychain (`keyring`) / 1Password.