hephaestus/crates/heph-core/tests/convergence.rs
Erich Blume a0b04eefda
All checks were successful
Build / validate (pull_request) Successful in 10m34s
feat: multi-tenancy seam (resolve_owner) + hub-setup how-to (v1 prep)
The cheap "seam" that keeps the single-owner hub from calcifying, ahead of
the gilbert -> indri bring-up:

- Replace the single-tenant gate `Store::authorize_owner_sub(sub) -> bool`
  with `resolve_owner(sub) -> Option<owner_id>`. The hub auth middleware now
  resolves the token's identity to the owner it may act as (Some -> allow,
  None -> 403). Behavior is identical for the single-owner hub (claim-on-first;
  strangers still 403), but the contract no longer assumes one global owner, so
  serving N owners later is additive, not a rewrite. The per-request owner is
  marked at the exact line where downstream scoping wires through.
- New how-to docs/how-to/set-up-sync-hub.md: stand up the hub and connect an
  existing device as an offline-capable spoke, the data-safe way (Path A: the
  hub adopts the device's identity rather than rewriting the device).

The decision (cheap seam now, defer full multi-tenancy + adoption rewrite) is
recorded in the Adoption + multi-tenant task's context doc. Two enabler gaps
the how-to surfaced (heph daemon hub/spoke service flags; Path-A seeding tool)
are filed as Hephaestus tasks.

Green: 228 tests, clippy -D warnings + fmt + prek clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-04 07:08:39 -07:00

269 lines
8.9 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Op-log + merge convergence (tech-spec §12, slice 8b8d). Two local replicas
//! (distinct origins) exchange ops; we assert they converge, that ambiguous
//! scalar merges surface as conflicts, and that concurrent body edits *merge*
//! through the yrs text CRDT instead of clobbering with last-writer-wins. No
//! network yet — we hand ops across directly.
use heph_core::{
Attention, Clock, LocalStore, NewNode, NewTask, Op, Store, SyncCursors, TaskState,
};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
#[derive(Clone)]
struct StepClock(Arc<AtomicI64>);
impl StepClock {
fn new(ms: i64) -> Self {
StepClock(Arc::new(AtomicI64::new(ms)))
}
fn set(&self, ms: i64) {
self.0.store(ms, Ordering::SeqCst);
}
}
impl Clock for StepClock {
fn now_ms(&self) -> i64 {
self.0.load(Ordering::SeqCst)
}
}
/// The shared canonical user both replicas adopt — the §13 model where every
/// device of one human carries the same owner id.
const OWNER: &str = "canonical-user";
fn replica(now: i64) -> (LocalStore, StepClock) {
let c = StepClock::new(now);
let mut s = LocalStore::open_in_memory(Box::new(c.clone())).unwrap();
s.adopt_owner(OWNER).unwrap();
(s, c)
}
/// Push every op from `src` (after `cursor`) into `dst`, in HLC order.
fn sync_one_way(src: &dyn Store, dst: &mut dyn Store, cursor: Option<&str>) -> Option<String> {
let ops: Vec<Op> = src.ops_since(cursor).unwrap();
let mut last = cursor.map(str::to_string);
for op in &ops {
dst.apply_op(op).unwrap();
last = Some(op.hlc.clone());
}
last
}
#[test]
fn sync_cursors_default_empty_then_advance_per_direction() {
let (mut a, _ca) = replica(1000);
const HUB: &str = "https://hub.example";
assert_eq!(a.sync_state(HUB).unwrap(), SyncCursors::default());
// Advancing only the push cursor leaves pull untouched, and vice versa.
a.record_sync(HUB, Some("hlc-push-1"), None).unwrap();
a.record_sync(HUB, None, Some("hlc-pull-1")).unwrap();
assert_eq!(
a.sync_state(HUB).unwrap(),
SyncCursors {
last_pushed_hlc: Some("hlc-push-1".into()),
last_pulled_hlc: Some("hlc-pull-1".into()),
}
);
// A second peer is tracked independently.
a.record_sync("other", Some("x"), None).unwrap();
assert_eq!(
a.sync_state(HUB).unwrap().last_pushed_hlc.as_deref(),
Some("hlc-push-1")
);
}
#[test]
fn resolve_owner_claims_first_then_requires_match() {
// The hub resolves an OIDC `sub` to its owner id (§13) — the multi-tenancy
// seam. The first sub claims the (single, for now) owner; only that sub
// resolves thereafter, and always to the same owner id.
let (mut a, _ca) = replica(1000);
let owner = a.resolve_owner("sub-alice").unwrap().expect("first claims");
assert_eq!(
a.resolve_owner("sub-alice").unwrap().as_deref(),
Some(owner.as_str()),
"same sub resolves to the same owner"
);
assert!(
a.resolve_owner("sub-mallory").unwrap().is_none(),
"a different identity does not resolve (the hub then 403s)"
);
// Still bound to the original after a rejection.
assert_eq!(
a.resolve_owner("sub-alice").unwrap().as_deref(),
Some(owner.as_str())
);
}
#[test]
fn online_round_trip_propagates_a_node() {
let (mut a, _ca) = replica(1000);
let (mut b, _cb) = replica(1000);
let n = a
.create_node(NewNode::doc("Roof", "shingles need work"))
.unwrap();
sync_one_way(&a, &mut b, None);
let on_b = b.get_node(&n.id).unwrap().expect("node reached B");
assert_eq!(on_b.title, "Roof");
assert_eq!(on_b.body.as_deref(), Some("shingles need work"));
}
#[test]
fn apply_is_idempotent() {
let (mut a, _ca) = replica(1000);
let (mut b, _cb) = replica(1000);
let n = a.create_node(NewNode::doc("X", "y")).unwrap();
// Apply all of A's ops to B twice — second pass is a no-op.
sync_one_way(&a, &mut b, None);
let again: Vec<Op> = a.ops_since(None).unwrap();
for op in &again {
assert!(
!b.apply_op(op).unwrap(),
"re-applying {} mutated B",
op.op_type
);
}
assert_eq!(b.get_node(&n.id).unwrap().unwrap().title, "X");
}
#[test]
fn offline_divergent_scalar_edits_converge_with_a_conflict() {
// A creates a task; B learns it. Then both go offline and set a different
// do_date. After exchanging, both converge to the higher-HLC value and each
// records a conflict for the discarded value.
let (mut a, ca) = replica(1000);
let (mut b, cb) = replica(1000);
let task = a
.create_task(NewTask {
title: "Renew passport".into(),
attention: Some(Attention::Orange),
..Default::default()
})
.unwrap();
sync_one_way(&a, &mut b, None);
// Divergent offline edits. B's edit is later (higher physical time) → wins.
ca.set(2000);
a.set_task_state(&task.node_id, TaskState::Done).unwrap();
cb.set(3000);
b.set_task_attention(&task.node_id, Attention::Red).unwrap();
// Give each a distinct do_date too (the conflicting field).
// (set_task_* above already produced task.set ops snapshotting scalars.)
// Exchange the divergent ops.
sync_one_way(&a, &mut b, None);
sync_one_way(&b, &mut a, None);
// Both replicas converge to identical task scalars.
let ta = a.get_task(&task.node_id).unwrap().unwrap();
let tb = b.get_task(&task.node_id).unwrap().unwrap();
assert_eq!(ta, tb, "replicas did not converge: {ta:?} vs {tb:?}");
// B wrote last (t=3000) → its attention=red wins on both.
assert_eq!(ta.attention, Some(Attention::Red));
// Each replica surfaced the divergence as a conflict (not silently merged).
assert!(
!a.conflicts_list().unwrap().is_empty(),
"A recorded no conflict"
);
assert!(
!b.conflicts_list().unwrap().is_empty(),
"B recorded no conflict"
);
}
#[test]
fn concurrent_body_edits_merge_via_crdt() {
// Both replicas edit *different regions* of a shared body offline. The text
// CRDT merges both edits — neither is lost to last-writer-wins, and no
// conflict is enqueued.
let (mut a, ca) = replica(1000);
let (mut b, cb) = replica(1000);
let n = a.create_node(NewNode::doc("Note", "Hello world")).unwrap();
sync_one_way(&a, &mut b, None);
// A inserts in the middle; B appends at the end.
ca.set(2000);
a.update_node(&n.id, None, Some("Hello brave world".into()))
.unwrap();
cb.set(2500);
b.update_node(&n.id, None, Some("Hello world!".into()))
.unwrap();
sync_one_way(&a, &mut b, None);
sync_one_way(&b, &mut a, None);
let ba = a.get_node(&n.id).unwrap().unwrap().body;
let bb = b.get_node(&n.id).unwrap().unwrap().body;
assert_eq!(ba, bb, "bodies did not converge");
assert_eq!(ba.as_deref(), Some("Hello brave world!")); // both edits survive
assert!(
a.conflicts_list().unwrap().is_empty() && b.conflicts_list().unwrap().is_empty(),
"body merges must not enqueue conflicts"
);
}
#[test]
fn links_are_an_or_set() {
// A and B each add a different link to the same node; both survive.
let (mut a, ca) = replica(1000);
let (mut b, cb) = replica(1000);
let src = a.create_node(NewNode::doc("src", "")).unwrap();
let d1 = a.create_node(NewNode::doc("d1", "")).unwrap();
sync_one_way(&a, &mut b, None);
let d2 = b.create_node(NewNode::doc("d2", "")).unwrap();
sync_one_way(&b, &mut a, None);
ca.set(2000);
a.add_link(&src.id, &d1.id, heph_core::LinkType::Blocks)
.unwrap();
cb.set(2000);
b.add_link(&src.id, &d2.id, heph_core::LinkType::Blocks)
.unwrap();
sync_one_way(&a, &mut b, None);
sync_one_way(&b, &mut a, None);
let dsts_a: Vec<String> = a
.outgoing_links(&src.id)
.unwrap()
.into_iter()
.filter(|l| l.link_type == heph_core::LinkType::Blocks)
.map(|l| l.dst_id)
.collect();
let mut dsts_b: Vec<String> = b
.outgoing_links(&src.id)
.unwrap()
.into_iter()
.filter(|l| l.link_type == heph_core::LinkType::Blocks)
.map(|l| l.dst_id)
.collect();
let mut sorted_a = dsts_a.clone();
sorted_a.sort();
dsts_b.sort();
assert_eq!(sorted_a, dsts_b, "link sets did not converge");
assert!(dsts_b.contains(&d1.id) && dsts_b.contains(&d2.id));
}
#[test]
fn tombstones_propagate_and_are_monotonic() {
let (mut a, _ca) = replica(1000);
let (mut b, _cb) = replica(1000);
let n = a.create_node(NewNode::doc("doomed", "")).unwrap();
sync_one_way(&a, &mut b, None);
a.tombstone_node(&n.id).unwrap();
sync_one_way(&a, &mut b, None);
assert!(b.get_node(&n.id).unwrap().unwrap().tombstoned);
// Tombstoned nodes drop out of search/next on B.
assert!(b.search("doomed").unwrap().is_empty());
}