hephaestus/crates/heph-core/tests/convergence.rs
Erich Blume 497c62a988
Some checks failed
Build / validate (pull_request) Failing after 3s
hephd: OIDC hub authentication — verification side (auth 10a)
Authenticate op exchange at the network boundary (tech-spec §13). The hub
now requires a valid OIDC bearer token on /sync/* and /rpc; local mode is
unchanged (no auth).

- heph-core: Store::authorize_owner_sub — single-tenant gate that claims the
  owner's oidc_sub on first sight, then authorizes only that sub (403 for any
  other identity). LocalStore impl over users.oidc_sub; RemoteStore stub.
- hephd auth module: TokenVerifier trait (mockable seam) + OidcVerifier
  (jsonwebtoken, rust_crypto). Strict validation: RS256 pinned, exact iss +
  aud, exp/nbf, required sub; JWKS discovered + cached, refetched on unknown
  kid (rotation). Claims/AuthError.
- Hub router takes Option<verifier>; an axum middleware on every route
  extracts the Bearer token, verifies it off the async worker, and runs the
  owner gate — 401 missing/invalid, 403 wrong identity, 503 IdP-unreachable.
  Open (no auth) when unconfigured, for local dev.
- main: --oidc-issuer/--oidc-audience enable the hub verifier (server mode).
- Security tests, all offline: stub-verifier middleware (missing/bad/valid +
  owner gate) and an adversarial battery driving OidcVerifier against an
  in-process mock IdP — rejects expired, wrong iss/aud, unknown kid, tampered
  signature, alg confusion (HS256/none), and missing sub. The RSA key + JWKS
  are generated at runtime (rsa/rand/base64 dev-deps) so no key is committed.
- tech-spec: add an end-of-v1 dependency-refresh pass to the roadmap.

108 tests green; clippy -D warnings + fmt + prek clean. Next: client-side
device-code login + keyring (10b).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-01 15:58:20 -07:00

261 lines
8.6 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Op-log + merge convergence (tech-spec §12, slice 8b8d). Two local replicas
//! (distinct origins) exchange ops; we assert they converge, that ambiguous
//! scalar merges surface as conflicts, and that concurrent body edits *merge*
//! through the yrs text CRDT instead of clobbering with last-writer-wins. No
//! network yet — we hand ops across directly.
use heph_core::{
Attention, Clock, LocalStore, NewNode, NewTask, Op, Store, SyncCursors, TaskState,
};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
#[derive(Clone)]
struct StepClock(Arc<AtomicI64>);
impl StepClock {
fn new(ms: i64) -> Self {
StepClock(Arc::new(AtomicI64::new(ms)))
}
fn set(&self, ms: i64) {
self.0.store(ms, Ordering::SeqCst);
}
}
impl Clock for StepClock {
fn now_ms(&self) -> i64 {
self.0.load(Ordering::SeqCst)
}
}
/// The shared canonical user both replicas adopt — the §13 model where every
/// device of one human carries the same owner id.
const OWNER: &str = "canonical-user";
fn replica(now: i64) -> (LocalStore, StepClock) {
let c = StepClock::new(now);
let mut s = LocalStore::open_in_memory(Box::new(c.clone())).unwrap();
s.adopt_owner(OWNER).unwrap();
(s, c)
}
/// Push every op from `src` (after `cursor`) into `dst`, in HLC order.
fn sync_one_way(src: &dyn Store, dst: &mut dyn Store, cursor: Option<&str>) -> Option<String> {
let ops: Vec<Op> = src.ops_since(cursor).unwrap();
let mut last = cursor.map(str::to_string);
for op in &ops {
dst.apply_op(op).unwrap();
last = Some(op.hlc.clone());
}
last
}
#[test]
fn sync_cursors_default_empty_then_advance_per_direction() {
let (mut a, _ca) = replica(1000);
const HUB: &str = "https://hub.example";
assert_eq!(a.sync_state(HUB).unwrap(), SyncCursors::default());
// Advancing only the push cursor leaves pull untouched, and vice versa.
a.record_sync(HUB, Some("hlc-push-1"), None).unwrap();
a.record_sync(HUB, None, Some("hlc-pull-1")).unwrap();
assert_eq!(
a.sync_state(HUB).unwrap(),
SyncCursors {
last_pushed_hlc: Some("hlc-push-1".into()),
last_pulled_hlc: Some("hlc-pull-1".into()),
}
);
// A second peer is tracked independently.
a.record_sync("other", Some("x"), None).unwrap();
assert_eq!(
a.sync_state(HUB).unwrap().last_pushed_hlc.as_deref(),
Some("hlc-push-1")
);
}
#[test]
fn owner_sub_gate_claims_first_then_requires_match() {
// Single-tenant gate (§13): the first sub claims the owner; only that sub
// is authorized thereafter.
let (mut a, _ca) = replica(1000);
assert!(a.authorize_owner_sub("sub-alice").unwrap(), "first claims");
assert!(a.authorize_owner_sub("sub-alice").unwrap(), "same sub ok");
assert!(
!a.authorize_owner_sub("sub-mallory").unwrap(),
"a different identity must be rejected"
);
// Still bound to the original after a rejection.
assert!(a.authorize_owner_sub("sub-alice").unwrap());
}
#[test]
fn online_round_trip_propagates_a_node() {
let (mut a, _ca) = replica(1000);
let (mut b, _cb) = replica(1000);
let n = a
.create_node(NewNode::doc("Roof", "shingles need work"))
.unwrap();
sync_one_way(&a, &mut b, None);
let on_b = b.get_node(&n.id).unwrap().expect("node reached B");
assert_eq!(on_b.title, "Roof");
assert_eq!(on_b.body.as_deref(), Some("shingles need work"));
}
#[test]
fn apply_is_idempotent() {
let (mut a, _ca) = replica(1000);
let (mut b, _cb) = replica(1000);
let n = a.create_node(NewNode::doc("X", "y")).unwrap();
// Apply all of A's ops to B twice — second pass is a no-op.
sync_one_way(&a, &mut b, None);
let again: Vec<Op> = a.ops_since(None).unwrap();
for op in &again {
assert!(
!b.apply_op(op).unwrap(),
"re-applying {} mutated B",
op.op_type
);
}
assert_eq!(b.get_node(&n.id).unwrap().unwrap().title, "X");
}
#[test]
fn offline_divergent_scalar_edits_converge_with_a_conflict() {
// A creates a task; B learns it. Then both go offline and set a different
// do_date. After exchanging, both converge to the higher-HLC value and each
// records a conflict for the discarded value.
let (mut a, ca) = replica(1000);
let (mut b, cb) = replica(1000);
let task = a
.create_task(NewTask {
title: "Renew passport".into(),
attention: Some(Attention::Orange),
..Default::default()
})
.unwrap();
sync_one_way(&a, &mut b, None);
// Divergent offline edits. B's edit is later (higher physical time) → wins.
ca.set(2000);
a.set_task_state(&task.node_id, TaskState::Done).unwrap();
cb.set(3000);
b.set_task_attention(&task.node_id, Attention::Red).unwrap();
// Give each a distinct do_date too (the conflicting field).
// (set_task_* above already produced task.set ops snapshotting scalars.)
// Exchange the divergent ops.
sync_one_way(&a, &mut b, None);
sync_one_way(&b, &mut a, None);
// Both replicas converge to identical task scalars.
let ta = a.get_task(&task.node_id).unwrap().unwrap();
let tb = b.get_task(&task.node_id).unwrap().unwrap();
assert_eq!(ta, tb, "replicas did not converge: {ta:?} vs {tb:?}");
// B wrote last (t=3000) → its attention=red wins on both.
assert_eq!(ta.attention, Some(Attention::Red));
// Each replica surfaced the divergence as a conflict (not silently merged).
assert!(
!a.conflicts_list().unwrap().is_empty(),
"A recorded no conflict"
);
assert!(
!b.conflicts_list().unwrap().is_empty(),
"B recorded no conflict"
);
}
#[test]
fn concurrent_body_edits_merge_via_crdt() {
// Both replicas edit *different regions* of a shared body offline. The text
// CRDT merges both edits — neither is lost to last-writer-wins, and no
// conflict is enqueued.
let (mut a, ca) = replica(1000);
let (mut b, cb) = replica(1000);
let n = a.create_node(NewNode::doc("Note", "Hello world")).unwrap();
sync_one_way(&a, &mut b, None);
// A inserts in the middle; B appends at the end.
ca.set(2000);
a.update_node(&n.id, None, Some("Hello brave world".into()))
.unwrap();
cb.set(2500);
b.update_node(&n.id, None, Some("Hello world!".into()))
.unwrap();
sync_one_way(&a, &mut b, None);
sync_one_way(&b, &mut a, None);
let ba = a.get_node(&n.id).unwrap().unwrap().body;
let bb = b.get_node(&n.id).unwrap().unwrap().body;
assert_eq!(ba, bb, "bodies did not converge");
assert_eq!(ba.as_deref(), Some("Hello brave world!")); // both edits survive
assert!(
a.conflicts_list().unwrap().is_empty() && b.conflicts_list().unwrap().is_empty(),
"body merges must not enqueue conflicts"
);
}
#[test]
fn links_are_an_or_set() {
// A and B each add a different link to the same node; both survive.
let (mut a, ca) = replica(1000);
let (mut b, cb) = replica(1000);
let src = a.create_node(NewNode::doc("src", "")).unwrap();
let d1 = a.create_node(NewNode::doc("d1", "")).unwrap();
sync_one_way(&a, &mut b, None);
let d2 = b.create_node(NewNode::doc("d2", "")).unwrap();
sync_one_way(&b, &mut a, None);
ca.set(2000);
a.add_link(&src.id, &d1.id, heph_core::LinkType::Blocks)
.unwrap();
cb.set(2000);
b.add_link(&src.id, &d2.id, heph_core::LinkType::Blocks)
.unwrap();
sync_one_way(&a, &mut b, None);
sync_one_way(&b, &mut a, None);
let dsts_a: Vec<String> = a
.outgoing_links(&src.id)
.unwrap()
.into_iter()
.filter(|l| l.link_type == heph_core::LinkType::Blocks)
.map(|l| l.dst_id)
.collect();
let mut dsts_b: Vec<String> = b
.outgoing_links(&src.id)
.unwrap()
.into_iter()
.filter(|l| l.link_type == heph_core::LinkType::Blocks)
.map(|l| l.dst_id)
.collect();
let mut sorted_a = dsts_a.clone();
sorted_a.sort();
dsts_b.sort();
assert_eq!(sorted_a, dsts_b, "link sets did not converge");
assert!(dsts_b.contains(&d1.id) && dsts_b.contains(&d2.id));
}
#[test]
fn tombstones_propagate_and_are_monotonic() {
let (mut a, _ca) = replica(1000);
let (mut b, _cb) = replica(1000);
let n = a.create_node(NewNode::doc("doomed", "")).unwrap();
sync_one_way(&a, &mut b, None);
a.tombstone_node(&n.id).unwrap();
sync_one_way(&a, &mut b, None);
assert!(b.get_node(&n.id).unwrap().unwrap().tombstoned);
// Tombstoned nodes drop out of search/next on B.
assert!(b.search("doomed").unwrap().is_empty());
}