generated from eblume/project-template
Lock in the base-case guarantee that a self-updating hub (which restarts under its spokes) relies on. New sync_http test: a spoke whose hub is unreachable keeps serving + accepting writes, a sync attempt fails fast (Err, not hang/panic), and when the hub returns the accumulated ops reconcile with no special recovery. The verification surfaced one non-graceful path — the daemon's shared reqwest client had no timeout, so a black-hole hub (connects, never replies) could stall the sync/self-update loop. Give it a 30s timeout so 'the hub can vanish at any moment' holds even mid-request. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
206 lines
7 KiB
Rust
206 lines
7 KiB
Rust
//! Network sync over real HTTP (tech-spec §6.1, §12, slice 9a). A hub (the
|
|
//! `sync::router`) runs on an ephemeral TCP port; two spoke replicas exchange
|
|
//! ops with it via `sync_once` and converge — exactly the offline-first
|
|
//! everyday config (`local` + `hub_url`). The merge logic is `heph-core`'s,
|
|
//! proven in its own convergence tests; here we prove the transport carries it.
|
|
|
|
use std::sync::atomic::{AtomicI64, Ordering};
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use heph_core::{Attention, Clock, LocalStore, NewNode, NewTask, Store, TaskState};
|
|
use hephd::sync;
|
|
|
|
/// Every replica + the hub adopt this one canonical owner (tech-spec §13).
|
|
const OWNER: &str = "canonical-user";
|
|
|
|
#[derive(Clone)]
|
|
struct StepClock(Arc<AtomicI64>);
|
|
impl StepClock {
|
|
fn new(ms: i64) -> Self {
|
|
StepClock(Arc::new(AtomicI64::new(ms)))
|
|
}
|
|
fn set(&self, ms: i64) {
|
|
self.0.store(ms, Ordering::SeqCst);
|
|
}
|
|
}
|
|
impl Clock for StepClock {
|
|
fn now_ms(&self) -> i64 {
|
|
self.0.load(Ordering::SeqCst)
|
|
}
|
|
}
|
|
|
|
type Shared = Arc<Mutex<LocalStore>>;
|
|
|
|
/// A replica backed by a temp SQLite file, sharing the canonical owner. The
|
|
/// `TempDir` is returned so the caller keeps the file alive.
|
|
fn replica(now: i64) -> (Shared, StepClock, tempfile::TempDir) {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let clock = StepClock::new(now);
|
|
let mut store = LocalStore::open(dir.path().join("heph.db"), Box::new(clock.clone())).unwrap();
|
|
store.adopt_owner(OWNER).unwrap();
|
|
(Arc::new(Mutex::new(store)), clock, dir)
|
|
}
|
|
|
|
/// Start the hub router on an ephemeral port; return its base URL. The serve
|
|
/// task and the hub's `TempDir` are leaked for the test's lifetime.
|
|
async fn start_hub() -> String {
|
|
let (hub, _clock, dir) = replica(1000);
|
|
Box::leak(Box::new(dir)); // keep the hub DB file alive
|
|
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
let addr = listener.local_addr().unwrap();
|
|
let app = sync::router(hub, None);
|
|
tokio::spawn(async move {
|
|
axum::serve(listener, app).await.unwrap();
|
|
});
|
|
format!("http://{addr}")
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn a_node_propagates_a_to_hub_to_b() {
|
|
let hub_url = start_hub().await;
|
|
let http = reqwest::Client::new();
|
|
let (a, _ca, _da) = replica(1000);
|
|
let (b, _cb, _db) = replica(1000);
|
|
|
|
let id = {
|
|
let mut ga = a.lock().unwrap();
|
|
ga.create_node(NewNode::doc("Roof", "shingles need work"))
|
|
.unwrap()
|
|
.id
|
|
};
|
|
|
|
// A pushes to the hub; B pulls from it.
|
|
let up = sync::sync_once(a.clone(), &hub_url, &http, None)
|
|
.await
|
|
.unwrap();
|
|
assert!(up.pushed > 0, "A pushed nothing");
|
|
let down = sync::sync_once(b.clone(), &hub_url, &http, None)
|
|
.await
|
|
.unwrap();
|
|
assert!(down.applied > 0, "B applied nothing");
|
|
|
|
let on_b = b.lock().unwrap().get_node(&id).unwrap().expect("reached B");
|
|
assert_eq!(on_b.title, "Roof");
|
|
assert_eq!(on_b.body.as_deref(), Some("shingles need work"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn spoke_survives_an_unreachable_hub_then_reconciles_when_it_returns() {
|
|
// "The hub can vanish at any moment" is the base case, not a guarded edge:
|
|
// a spoke whose hub is down keeps serving + accepting writes, and when the
|
|
// hub returns its accumulated ops reconcile with no special recovery. This
|
|
// is what makes a self-updating hub (which restarts under its spokes) safe.
|
|
let http = reqwest::Client::builder()
|
|
.timeout(std::time::Duration::from_secs(5)) // never hang the test
|
|
.build()
|
|
.unwrap();
|
|
let (a, _ca, _da) = replica(1000);
|
|
|
|
// Hub down: work happens locally, and a sync attempt fails *fast* (Err — not
|
|
// a panic, not a hang) and leaves the store untouched.
|
|
let id = {
|
|
let mut ga = a.lock().unwrap();
|
|
ga.create_node(NewNode::doc(
|
|
"Offline note",
|
|
"written while the hub was down",
|
|
))
|
|
.unwrap()
|
|
.id
|
|
};
|
|
let dead_hub = "http://127.0.0.1:1"; // nothing listens → connection refused
|
|
assert!(
|
|
sync::sync_once(a.clone(), dead_hub, &http, None)
|
|
.await
|
|
.is_err(),
|
|
"sync against a dead hub should error, not hang or panic"
|
|
);
|
|
|
|
// The spoke is unharmed: the note is intact and further writes still succeed.
|
|
{
|
|
let mut ga = a.lock().unwrap();
|
|
assert_eq!(ga.get_node(&id).unwrap().unwrap().title, "Offline note");
|
|
ga.create_node(NewNode::doc("Another", "still working offline"))
|
|
.unwrap();
|
|
}
|
|
|
|
// The hub returns: the spoke pushes everything it accumulated while offline,
|
|
// and a fresh replica pulls it — convergence resumes, no manual recovery.
|
|
let hub_url = start_hub().await;
|
|
let up = sync::sync_once(a.clone(), &hub_url, &http, None)
|
|
.await
|
|
.unwrap();
|
|
assert!(up.pushed > 0, "spoke pushed nothing after the hub returned");
|
|
let (b, _cb, _db) = replica(1000);
|
|
sync::sync_once(b.clone(), &hub_url, &http, None)
|
|
.await
|
|
.unwrap();
|
|
let on_b = b
|
|
.lock()
|
|
.unwrap()
|
|
.get_node(&id)
|
|
.unwrap()
|
|
.expect("offline-authored node reached B after the hub recovered");
|
|
assert_eq!(on_b.title, "Offline note");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn divergent_scalar_edits_converge_through_the_hub_with_a_conflict() {
|
|
let hub_url = start_hub().await;
|
|
let http = reqwest::Client::new();
|
|
let (a, ca, _da) = replica(1000);
|
|
let (b, cb, _db) = replica(1000);
|
|
|
|
// A creates a task and both replicas learn it through the hub.
|
|
let task_id = {
|
|
let mut ga = a.lock().unwrap();
|
|
ga.create_task(NewTask {
|
|
title: "Renew passport".into(),
|
|
attention: Some(Attention::Orange),
|
|
..Default::default()
|
|
})
|
|
.unwrap()
|
|
.node_id
|
|
};
|
|
sync::sync_once(a.clone(), &hub_url, &http, None)
|
|
.await
|
|
.unwrap();
|
|
sync::sync_once(b.clone(), &hub_url, &http, None)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Divergent offline edits on conflict-tracked fields; B's is later (higher
|
|
// HLC) so its whole scalar snapshot wins.
|
|
ca.set(2000);
|
|
a.lock()
|
|
.unwrap()
|
|
.set_task_state(&task_id, TaskState::Done)
|
|
.unwrap();
|
|
cb.set(3000);
|
|
b.lock()
|
|
.unwrap()
|
|
.set_task_attention(&task_id, Attention::Red)
|
|
.unwrap();
|
|
|
|
// A few exchanges in each direction settle it.
|
|
for _ in 0..2 {
|
|
sync::sync_once(a.clone(), &hub_url, &http, None)
|
|
.await
|
|
.unwrap();
|
|
sync::sync_once(b.clone(), &hub_url, &http, None)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let ta = a.lock().unwrap().get_task(&task_id).unwrap().unwrap();
|
|
let tb = b.lock().unwrap().get_task(&task_id).unwrap().unwrap();
|
|
assert_eq!(ta, tb, "replicas did not converge: {ta:?} vs {tb:?}");
|
|
assert_eq!(ta.attention, Some(Attention::Red), "later HLC should win");
|
|
assert!(
|
|
!a.lock().unwrap().conflicts_list().unwrap().is_empty(),
|
|
"A recorded no conflict"
|
|
);
|
|
assert!(
|
|
!b.lock().unwrap().conflicts_list().unwrap().is_empty(),
|
|
"B recorded no conflict"
|
|
);
|
|
}
|