hephaestus/crates/hephd/src/sync.rs
Erich Blume 8c25d114c4
Some checks failed
Build / validate (pull_request) Failing after 2s
hephd: network sync over HTTP — hub + spoke (sync 9a)
Wire the existing merge engine over the network so the everyday config
(local + hub_url) syncs through a hub. Transport ratified = axum HTTP/JSON
(tech-spec §6.1, §12).

- heph-core: SyncCursors model + Store::sync_state/record_sync over the
  sync_state table (per-peer push/pull HLC cursors). Incremental, so each
  exchange transfers only the tail.
- hephd::sync: the hub router (POST /sync/push, GET /sync/pull?after=<hlc>)
  served from the shared LocalStore, and sync_once — a spoke's pull-then-
  merge, then push-tail exchange, advancing the cursors. Idempotent: a
  re-pushed op the hub already has is a no-op.
- Daemon carries optional hub config; sync.now/sync.status handled at the
  daemon (they need the hub transport the store can't reach). conflicts.
  list/resolve now reachable over the unix socket too.
- main: --mode local|server, --hub-url, --http-addr. server mode binds the
  hub HTTP endpoint on the same store; a local+hub_url spoke background-
  syncs on a 30s interval.
- tests/sync_http.rs: two spokes converge through a real-HTTP hub on an
  ephemeral port — node propagation and a divergent-scalar conflict.

Unauthenticated/single-owner for now; OIDC + per-user scoping is slice 10,
client mode + RemoteStore is 9b. 100 tests green; clippy -D warnings + fmt
+ prek clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-01 15:14:20 -07:00

176 lines
5.9 KiB
Rust

//! Spoke↔hub op-log sync over HTTP (tech-spec §6.1, §12).
//!
//! The merge engine itself lives in `heph-core` (deterministic, transport-free).
//! This module is the **transport**: a [`router`] the **hub** (server mode)
//! mounts, and [`sync_once`] a **spoke** (`local` + `hub_url`) runs to exchange
//! ops with that hub. Both speak JSON over HTTP with two routes:
//!
//! - `POST /sync/push` — the spoke sends its new ops; the hub merges them.
//! - `GET /sync/pull?after=<hlc>` — the hub returns ops past the spoke's cursor.
//!
//! Exchange is **incremental by HLC cursor** (`sync_state`, [`heph_core::SyncCursors`]):
//! each side transfers only the tail it hasn't sent/seen. Merge is idempotent,
//! so a re-pushed op the hub already has is a harmless no-op. Auth is deferred to
//! tech-spec §13 (slice 10) — the endpoint is currently unauthenticated and
//! scoped to the hub's single owner.
use std::sync::{Arc, Mutex};
use anyhow::Result;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use axum::routing::{get, post};
use axum::{Json, Router};
use serde::{Deserialize, Serialize};
use heph_core::{LocalStore, Op, Store};
/// The shared store handle a hub serves from.
type SharedStore = Arc<Mutex<LocalStore>>;
/// A batch of ops in flight (push body / pull response).
#[derive(Debug, Serialize, Deserialize)]
pub struct OpsBody {
/// The ops, applied in HLC order by the receiver.
pub ops: Vec<Op>,
}
/// What one [`sync_once`] exchange moved.
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct SyncReport {
/// Ops received from the hub.
pub pulled: usize,
/// Of the pulled ops, how many were newly applied (not already seen).
pub applied: usize,
/// Ops sent to the hub.
pub pushed: usize,
}
/// Run `f` against the locked store on the blocking pool (DB calls never run on
/// an async worker, tech-spec §3).
async fn with_store<T, F>(store: &SharedStore, f: F) -> Result<T>
where
F: FnOnce(&mut LocalStore) -> heph_core::Result<T> + Send + 'static,
T: Send + 'static,
{
let store = store.clone();
let out = tokio::task::spawn_blocking(move || {
let mut guard = store.lock().expect("store mutex poisoned");
f(&mut guard)
})
.await?;
Ok(out?)
}
/// Apply a batch of ops in HLC order, returning how many were newly applied and
/// the highest HLC seen (the new cursor position).
fn apply_batch(
store: &mut LocalStore,
mut ops: Vec<Op>,
) -> heph_core::Result<(usize, Option<String>)> {
ops.sort_by(|a, b| a.hlc.cmp(&b.hlc));
let mut applied = 0;
let mut max_hlc = None;
for op in &ops {
if store.apply_op(op)? {
applied += 1;
}
max_hlc = Some(op.hlc.clone());
}
Ok((applied, max_hlc))
}
/// The hub's HTTP router (server mode). Mount it on a TCP listener.
pub fn router(store: SharedStore) -> Router {
Router::new()
.route("/sync/pull", get(pull))
.route("/sync/push", post(push))
.with_state(store)
}
#[derive(Debug, Deserialize)]
struct PullQuery {
/// HLC cursor — return ops strictly newer than this (absent ⇒ from the start).
#[serde(default)]
after: Option<String>,
}
/// `GET /sync/pull?after=<hlc>` — ops past the caller's cursor, HLC order.
async fn pull(
State(store): State<SharedStore>,
Query(q): Query<PullQuery>,
) -> Result<Json<OpsBody>, StatusCode> {
let ops = with_store(&store, move |s| s.ops_since(q.after.as_deref()))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(OpsBody { ops }))
}
/// `POST /sync/push` — merge the caller's ops; reply with how many newly applied.
async fn push(
State(store): State<SharedStore>,
Json(body): Json<OpsBody>,
) -> Result<Json<SyncReport>, StatusCode> {
let (applied, _max) = with_store(&store, move |s| apply_batch(s, body.ops))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(SyncReport {
applied,
..Default::default()
}))
}
/// Exchange ops with `hub_url` once: pull new ops and merge them, then push our
/// new ops. Advances the per-hub cursors so the next call transfers only the
/// tail. `http` is a shared [`reqwest::Client`].
pub async fn sync_once(
store: SharedStore,
hub_url: &str,
http: &reqwest::Client,
) -> Result<SyncReport> {
let base = hub_url.trim_end_matches('/');
let mut report = SyncReport::default();
let cursors = {
let hub = hub_url.to_string();
with_store(&store, move |s| s.sync_state(&hub)).await?
};
// --- pull then merge ---
let mut req = http.get(format!("{base}/sync/pull"));
if let Some(after) = &cursors.last_pulled_hlc {
req = req.query(&[("after", after)]);
}
let pulled: OpsBody = req.send().await?.error_for_status()?.json().await?;
report.pulled = pulled.ops.len();
if !pulled.ops.is_empty() {
let (applied, max_pulled) = with_store(&store, move |s| apply_batch(s, pulled.ops)).await?;
report.applied = applied;
if let Some(cursor) = max_pulled {
let hub = hub_url.to_string();
with_store(&store, move |s| s.record_sync(&hub, None, Some(&cursor))).await?;
}
}
// --- push our tail ---
let to_push = {
let after = cursors.last_pushed_hlc.clone();
with_store(&store, move |s| s.ops_since(after.as_deref())).await?
};
report.pushed = to_push.len();
if !to_push.is_empty() {
// `ops_since` returns HLC order, so the last is the new cursor.
let max_pushed = to_push.last().map(|o| o.hlc.clone());
http.post(format!("{base}/sync/push"))
.json(&OpsBody { ops: to_push })
.send()
.await?
.error_for_status()?;
if let Some(cursor) = max_pushed {
let hub = hub_url.to_string();
with_store(&store, move |s| s.record_sync(&hub, Some(&cursor), None)).await?;
}
}
Ok(report)
}