//! Network sync over real HTTP (tech-spec §6.1, §12, slice 9a). A hub (the //! `sync::router`) runs on an ephemeral TCP port; two spoke replicas exchange //! ops with it via `sync_once` and converge — exactly the offline-first //! everyday config (`local` + `hub_url`). The merge logic is `heph-core`'s, //! proven in its own convergence tests; here we prove the transport carries it. use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::{Arc, Mutex}; use heph_core::{Attention, Clock, LocalStore, NewNode, NewTask, Store, TaskState}; use hephd::sync; /// Every replica + the hub adopt this one canonical owner (tech-spec §13). const OWNER: &str = "canonical-user"; #[derive(Clone)] struct StepClock(Arc); impl StepClock { fn new(ms: i64) -> Self { StepClock(Arc::new(AtomicI64::new(ms))) } fn set(&self, ms: i64) { self.0.store(ms, Ordering::SeqCst); } } impl Clock for StepClock { fn now_ms(&self) -> i64 { self.0.load(Ordering::SeqCst) } } type Shared = Arc>; /// A replica backed by a temp SQLite file, sharing the canonical owner. The /// `TempDir` is returned so the caller keeps the file alive. fn replica(now: i64) -> (Shared, StepClock, tempfile::TempDir) { let dir = tempfile::tempdir().unwrap(); let clock = StepClock::new(now); let mut store = LocalStore::open(dir.path().join("heph.db"), Box::new(clock.clone())).unwrap(); store.adopt_owner(OWNER).unwrap(); (Arc::new(Mutex::new(store)), clock, dir) } /// Start the hub router on an ephemeral port; return its base URL. The serve /// task and the hub's `TempDir` are leaked for the test's lifetime. async fn start_hub() -> String { let (hub, _clock, dir) = replica(1000); Box::leak(Box::new(dir)); // keep the hub DB file alive let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let app = sync::router(hub, None); tokio::spawn(async move { axum::serve(listener, app).await.unwrap(); }); format!("http://{addr}") } #[tokio::test] async fn a_node_propagates_a_to_hub_to_b() { let hub_url = start_hub().await; let http = reqwest::Client::new(); let (a, _ca, _da) = replica(1000); let (b, _cb, _db) = replica(1000); let id = { let mut ga = a.lock().unwrap(); ga.create_node(NewNode::doc("Roof", "shingles need work")) .unwrap() .id }; // A pushes to the hub; B pulls from it. let up = sync::sync_once(a.clone(), &hub_url, &http, None) .await .unwrap(); assert!(up.pushed > 0, "A pushed nothing"); let down = sync::sync_once(b.clone(), &hub_url, &http, None) .await .unwrap(); assert!(down.applied > 0, "B applied nothing"); let on_b = b.lock().unwrap().get_node(&id).unwrap().expect("reached B"); assert_eq!(on_b.title, "Roof"); assert_eq!(on_b.body.as_deref(), Some("shingles need work")); } #[tokio::test] async fn spoke_survives_an_unreachable_hub_then_reconciles_when_it_returns() { // "The hub can vanish at any moment" is the base case, not a guarded edge: // a spoke whose hub is down keeps serving + accepting writes, and when the // hub returns its accumulated ops reconcile with no special recovery. This // is what makes a self-updating hub (which restarts under its spokes) safe. let http = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(5)) // never hang the test .build() .unwrap(); let (a, _ca, _da) = replica(1000); // Hub down: work happens locally, and a sync attempt fails *fast* (Err — not // a panic, not a hang) and leaves the store untouched. let id = { let mut ga = a.lock().unwrap(); ga.create_node(NewNode::doc( "Offline note", "written while the hub was down", )) .unwrap() .id }; let dead_hub = "http://127.0.0.1:1"; // nothing listens → connection refused assert!( sync::sync_once(a.clone(), dead_hub, &http, None) .await .is_err(), "sync against a dead hub should error, not hang or panic" ); // The spoke is unharmed: the note is intact and further writes still succeed. { let mut ga = a.lock().unwrap(); assert_eq!(ga.get_node(&id).unwrap().unwrap().title, "Offline note"); ga.create_node(NewNode::doc("Another", "still working offline")) .unwrap(); } // The hub returns: the spoke pushes everything it accumulated while offline, // and a fresh replica pulls it — convergence resumes, no manual recovery. let hub_url = start_hub().await; let up = sync::sync_once(a.clone(), &hub_url, &http, None) .await .unwrap(); assert!(up.pushed > 0, "spoke pushed nothing after the hub returned"); let (b, _cb, _db) = replica(1000); sync::sync_once(b.clone(), &hub_url, &http, None) .await .unwrap(); let on_b = b .lock() .unwrap() .get_node(&id) .unwrap() .expect("offline-authored node reached B after the hub recovered"); assert_eq!(on_b.title, "Offline note"); } #[tokio::test] async fn divergent_scalar_edits_converge_through_the_hub_with_a_conflict() { let hub_url = start_hub().await; let http = reqwest::Client::new(); let (a, ca, _da) = replica(1000); let (b, cb, _db) = replica(1000); // A creates a task and both replicas learn it through the hub. let task_id = { let mut ga = a.lock().unwrap(); ga.create_task(NewTask { title: "Renew passport".into(), attention: Some(Attention::Orange), ..Default::default() }) .unwrap() .node_id }; sync::sync_once(a.clone(), &hub_url, &http, None) .await .unwrap(); sync::sync_once(b.clone(), &hub_url, &http, None) .await .unwrap(); // Divergent offline edits on conflict-tracked fields; B's is later (higher // HLC) so its whole scalar snapshot wins. ca.set(2000); a.lock() .unwrap() .set_task_state(&task_id, TaskState::Done) .unwrap(); cb.set(3000); b.lock() .unwrap() .set_task_attention(&task_id, Attention::Red) .unwrap(); // A few exchanges in each direction settle it. for _ in 0..2 { sync::sync_once(a.clone(), &hub_url, &http, None) .await .unwrap(); sync::sync_once(b.clone(), &hub_url, &http, None) .await .unwrap(); } let ta = a.lock().unwrap().get_task(&task_id).unwrap().unwrap(); let tb = b.lock().unwrap().get_task(&task_id).unwrap().unwrap(); assert_eq!(ta, tb, "replicas did not converge: {ta:?} vs {tb:?}"); assert_eq!(ta.attention, Some(Attention::Red), "later HLC should win"); assert!( !a.lock().unwrap().conflicts_list().unwrap().is_empty(), "A recorded no conflict" ); assert!( !b.lock().unwrap().conflicts_list().unwrap().is_empty(), "B recorded no conflict" ); } #[tokio::test] async fn fresh_hub_seeded_with_owner_id_rebuilds_from_first_sync() { // Path-A seeding: a brand-new hub started with the device's owner id (the // `hephd --owner-id` flag) needs no snapshot copy — the spoke's first sync // replays its whole op-log into the hub, and the hub keeps its own fresh // device origin by construction. let http = reqwest::Client::new(); // The device: its own generated owner, with pre-existing data. let dir = tempfile::tempdir().unwrap(); let mut device = LocalStore::open(dir.path().join("heph.db"), Box::new(StepClock::new(1000))).unwrap(); let owner = device.owner_id().to_string(); let task = device .create_task(NewTask { title: "Pre-hub task".into(), ..Default::default() }) .unwrap(); let device = Arc::new(Mutex::new(device)); // The hub: a fresh store adopting the device's owner (what --owner-id does). let hub_dir = tempfile::tempdir().unwrap(); let mut hub_store = LocalStore::open( hub_dir.path().join("heph.db"), Box::new(StepClock::new(1000)), ) .unwrap(); hub_store.adopt_owner(&owner).unwrap(); let hub = Arc::new(Mutex::new(hub_store)); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let app = sync::router(hub.clone(), None); tokio::spawn(async move { axum::serve(listener, app).await.unwrap(); }); let hub_url = format!("http://{addr}"); let report = sync::sync_once(device.clone(), &hub_url, &http, None) .await .unwrap(); assert!(report.pushed > 0, "device pushed nothing"); let on_hub = hub .lock() .unwrap() .get_node(&task.node_id) .unwrap() .expect("task reached the hub"); assert_eq!(on_hub.title, "Pre-hub task"); }