hephaestus/crates/hephd/tests/sync_http.rs
Erich Blume f4db186234
Some checks failed
Build / validate (pull_request) Failing after 9s
hephd: OIDC client auth — device-code flow + token attach (auth 10b)
Close the auth loop: clients obtain a bearer token and present it to the
hub (tech-spec §13).

- oauth module: DeviceFlow (RFC 8628 — discover, start, poll handling
  authorization_pending/slow_down, refresh) + StoredToken + TokenStore
  (OS keyring via `keyring`, in-memory for tests) + current_bearer (loads
  and refreshes-on-expiry).
- heph auth login/logout: runs the device flow, prints the verification
  URL + user code, caches the token in the keyring.
- sync_once gains a bearer arg; the daemon (Daemon::spawn_sync_loop +
  sync.now) obtains it via current_bearer; RemoteStore attaches it to /rpc.
  --oidc-issuer/--oidc-client-id configure the spoke/client.
- Fix a latent panic: reqwest::blocking spins its own runtime and panics
  inside the daemon's spawn_blocking pool. All blocking auth/proxy HTTP
  (OidcVerifier JWKS, DeviceFlow, RemoteStore) now uses runtime-free `ureq`;
  async reqwest remains only for sync_once. (Caught by the new e2e test.)
- Tests (offline): device flow + refresh + token store vs a mock OAuth
  server; a full spoke->authenticated-hub loop (valid token accepted,
  missing token rejected) signed by a runtime-generated RSA key.

112 tests green; clippy -D warnings + fmt + prek clean. Slice 10 (auth)
complete; next is heph.nvim.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-01 16:27:36 -07:00

147 lines
4.8 KiB
Rust

//! Network sync over real HTTP (tech-spec §6.1, §12, slice 9a). A hub (the
//! `sync::router`) runs on an ephemeral TCP port; two spoke replicas exchange
//! ops with it via `sync_once` and converge — exactly the offline-first
//! everyday config (`local` + `hub_url`). The merge logic is `heph-core`'s,
//! proven in its own convergence tests; here we prove the transport carries it.
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, Mutex};
use heph_core::{Attention, Clock, LocalStore, NewNode, NewTask, Store, TaskState};
use hephd::sync;
/// Every replica + the hub adopt this one canonical owner (tech-spec §13).
const OWNER: &str = "canonical-user";
#[derive(Clone)]
struct StepClock(Arc<AtomicI64>);
impl StepClock {
fn new(ms: i64) -> Self {
StepClock(Arc::new(AtomicI64::new(ms)))
}
fn set(&self, ms: i64) {
self.0.store(ms, Ordering::SeqCst);
}
}
impl Clock for StepClock {
fn now_ms(&self) -> i64 {
self.0.load(Ordering::SeqCst)
}
}
type Shared = Arc<Mutex<LocalStore>>;
/// A replica backed by a temp SQLite file, sharing the canonical owner. The
/// `TempDir` is returned so the caller keeps the file alive.
fn replica(now: i64) -> (Shared, StepClock, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let clock = StepClock::new(now);
let mut store = LocalStore::open(dir.path().join("heph.db"), Box::new(clock.clone())).unwrap();
store.adopt_owner(OWNER).unwrap();
(Arc::new(Mutex::new(store)), clock, dir)
}
/// Start the hub router on an ephemeral port; return its base URL. The serve
/// task and the hub's `TempDir` are leaked for the test's lifetime.
async fn start_hub() -> String {
let (hub, _clock, dir) = replica(1000);
Box::leak(Box::new(dir)); // keep the hub DB file alive
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let app = sync::router(hub, None);
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
format!("http://{addr}")
}
#[tokio::test]
async fn a_node_propagates_a_to_hub_to_b() {
let hub_url = start_hub().await;
let http = reqwest::Client::new();
let (a, _ca, _da) = replica(1000);
let (b, _cb, _db) = replica(1000);
let id = {
let mut ga = a.lock().unwrap();
ga.create_node(NewNode::doc("Roof", "shingles need work"))
.unwrap()
.id
};
// A pushes to the hub; B pulls from it.
let up = sync::sync_once(a.clone(), &hub_url, &http, None)
.await
.unwrap();
assert!(up.pushed > 0, "A pushed nothing");
let down = sync::sync_once(b.clone(), &hub_url, &http, None)
.await
.unwrap();
assert!(down.applied > 0, "B applied nothing");
let on_b = b.lock().unwrap().get_node(&id).unwrap().expect("reached B");
assert_eq!(on_b.title, "Roof");
assert_eq!(on_b.body.as_deref(), Some("shingles need work"));
}
#[tokio::test]
async fn divergent_scalar_edits_converge_through_the_hub_with_a_conflict() {
let hub_url = start_hub().await;
let http = reqwest::Client::new();
let (a, ca, _da) = replica(1000);
let (b, cb, _db) = replica(1000);
// A creates a task and both replicas learn it through the hub.
let task_id = {
let mut ga = a.lock().unwrap();
ga.create_task(NewTask {
title: "Renew passport".into(),
attention: Some(Attention::Orange),
..Default::default()
})
.unwrap()
.node_id
};
sync::sync_once(a.clone(), &hub_url, &http, None)
.await
.unwrap();
sync::sync_once(b.clone(), &hub_url, &http, None)
.await
.unwrap();
// Divergent offline edits on conflict-tracked fields; B's is later (higher
// HLC) so its whole scalar snapshot wins.
ca.set(2000);
a.lock()
.unwrap()
.set_task_state(&task_id, TaskState::Done)
.unwrap();
cb.set(3000);
b.lock()
.unwrap()
.set_task_attention(&task_id, Attention::Red)
.unwrap();
// A few exchanges in each direction settle it.
for _ in 0..2 {
sync::sync_once(a.clone(), &hub_url, &http, None)
.await
.unwrap();
sync::sync_once(b.clone(), &hub_url, &http, None)
.await
.unwrap();
}
let ta = a.lock().unwrap().get_task(&task_id).unwrap().unwrap();
let tb = b.lock().unwrap().get_task(&task_id).unwrap().unwrap();
assert_eq!(ta, tb, "replicas did not converge: {ta:?} vs {tb:?}");
assert_eq!(ta.attention, Some(Attention::Red), "later HLC should win");
assert!(
!a.lock().unwrap().conflicts_list().unwrap().is_empty(),
"A recorded no conflict"
);
assert!(
!b.lock().unwrap().conflicts_list().unwrap().is_empty(),
"B recorded no conflict"
);
}