hephaestus/crates/hephd/src/sync.rs
Erich Blume f4db186234
Some checks failed
Build / validate (pull_request) Failing after 9s
hephd: OIDC client auth — device-code flow + token attach (auth 10b)
Close the auth loop: clients obtain a bearer token and present it to the
hub (tech-spec §13).

- oauth module: DeviceFlow (RFC 8628 — discover, start, poll handling
  authorization_pending/slow_down, refresh) + StoredToken + TokenStore
  (OS keyring via `keyring`, in-memory for tests) + current_bearer (loads
  and refreshes-on-expiry).
- heph auth login/logout: runs the device flow, prints the verification
  URL + user code, caches the token in the keyring.
- sync_once gains a bearer arg; the daemon (Daemon::spawn_sync_loop +
  sync.now) obtains it via current_bearer; RemoteStore attaches it to /rpc.
  --oidc-issuer/--oidc-client-id configure the spoke/client.
- Fix a latent panic: reqwest::blocking spins its own runtime and panics
  inside the daemon's spawn_blocking pool. All blocking auth/proxy HTTP
  (OidcVerifier JWKS, DeviceFlow, RemoteStore) now uses runtime-free `ureq`;
  async reqwest remains only for sync_once. (Caught by the new e2e test.)
- Tests (offline): device flow + refresh + token store vs a mock OAuth
  server; a full spoke->authenticated-hub loop (valid token accepted,
  missing token rejected) signed by a runtime-generated RSA key.

112 tests green; clippy -D warnings + fmt + prek clean. Slice 10 (auth)
complete; next is heph.nvim.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-01 16:27:36 -07:00

290 lines
10 KiB
Rust

//! Spoke↔hub op-log sync over HTTP (tech-spec §6.1, §12).
//!
//! The merge engine itself lives in `heph-core` (deterministic, transport-free).
//! This module is the **transport**: a [`router`] the **hub** (server mode)
//! mounts, and [`sync_once`] a **spoke** (`local` + `hub_url`) runs to exchange
//! ops with that hub. Both speak JSON over HTTP with two routes:
//!
//! - `POST /sync/push` — the spoke sends its new ops; the hub merges them.
//! - `GET /sync/pull?after=<hlc>` — the hub returns ops past the spoke's cursor.
//! - `POST /rpc` — the full daemon API ([`crate::rpc::dispatch`]) over HTTP, for
//! a no-replica `client`-mode [`crate::remote::RemoteStore`] to proxy against.
//!
//! Exchange is **incremental by HLC cursor** (`sync_state`, [`heph_core::SyncCursors`]):
//! each side transfers only the tail it hasn't sent/seen. Merge is idempotent,
//! so a re-pushed op the hub already has is a harmless no-op. When the hub is
//! configured with a verifier ([`crate::auth`]), every route requires a valid
//! OIDC bearer token whose `sub` owns the hub (tech-spec §13); spokes attach
//! that token via the `bearer` argument to [`sync_once`].
use std::sync::{Arc, Mutex};
use anyhow::Result;
use axum::extract::{Query, Request, State};
use axum::http::StatusCode;
use axum::middleware::{self, Next};
use axum::response::Response as AxumResponse;
use axum::routing::{get, post};
use axum::{Json, Router};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use heph_core::{Op, Store};
use crate::auth::{AuthError, TokenVerifier};
use crate::rpc::{self, Response, RpcError, INTERNAL_ERROR};
/// The shared store a hub serves from — any [`Store`], so a `server` fronts a
/// `LocalStore` and (later modes) could front another backend.
pub type SharedStore = Arc<Mutex<dyn Store + Send>>;
/// What the hub HTTP routes share: the store and (when authentication is
/// configured) the bearer-token verifier.
#[derive(Clone)]
struct HubState {
store: SharedStore,
verifier: Option<Arc<dyn TokenVerifier>>,
}
/// A batch of ops in flight (push body / pull response).
#[derive(Debug, Serialize, Deserialize)]
pub struct OpsBody {
/// The ops, applied in HLC order by the receiver.
pub ops: Vec<Op>,
}
/// What one [`sync_once`] exchange moved.
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct SyncReport {
/// Ops received from the hub.
pub pulled: usize,
/// Of the pulled ops, how many were newly applied (not already seen).
pub applied: usize,
/// Ops sent to the hub.
pub pushed: usize,
}
/// Run `f` against the locked store on the blocking pool (DB calls never run on
/// an async worker, tech-spec §3).
async fn with_store<T, F>(store: &SharedStore, f: F) -> Result<T>
where
F: FnOnce(&mut (dyn Store + Send)) -> heph_core::Result<T> + Send + 'static,
T: Send + 'static,
{
let store = store.clone();
let out = tokio::task::spawn_blocking(move || {
let mut guard = store.lock().expect("store mutex poisoned");
f(&mut *guard)
})
.await?;
Ok(out?)
}
/// Apply a batch of ops in HLC order, returning how many were newly applied and
/// the highest HLC seen (the new cursor position).
fn apply_batch(
store: &mut (dyn Store + Send),
mut ops: Vec<Op>,
) -> heph_core::Result<(usize, Option<String>)> {
ops.sort_by(|a, b| a.hlc.cmp(&b.hlc));
let mut applied = 0;
let mut max_hlc = None;
for op in &ops {
if store.apply_op(op)? {
applied += 1;
}
max_hlc = Some(op.hlc.clone());
}
Ok((applied, max_hlc))
}
/// The hub's HTTP router (server mode). Mount it on a TCP listener. When
/// `verifier` is `Some`, every route requires a valid OIDC bearer token whose
/// `sub` owns this hub (tech-spec §13); `None` leaves the hub open (local dev).
pub fn router(store: SharedStore, verifier: Option<Arc<dyn TokenVerifier>>) -> Router {
let state = HubState { store, verifier };
Router::new()
.route("/sync/pull", get(pull))
.route("/sync/push", post(push))
.route("/rpc", post(rpc_call))
.route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
.with_state(state)
}
/// Reject any request lacking a valid bearer token whose `sub` owns this hub.
/// A no-op when the hub has no verifier configured (open dev mode).
async fn require_auth(
State(state): State<HubState>,
request: Request,
next: Next,
) -> Result<AxumResponse, StatusCode> {
let Some(verifier) = state.verifier.clone() else {
return Ok(next.run(request).await); // open: no auth configured
};
let Some(token) = bearer_token(&request) else {
return Err(StatusCode::UNAUTHORIZED);
};
// Verification (and the store gate) hit the network / DB — run off the async
// worker on the blocking pool.
let claims = tokio::task::spawn_blocking(move || verifier.verify(&token))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.map_err(|e| match e {
AuthError::Provider(_) => StatusCode::SERVICE_UNAVAILABLE,
_ => StatusCode::UNAUTHORIZED,
})?;
// Single-tenant gate: the token's identity must own this hub.
let store = state.store.clone();
let owns = tokio::task::spawn_blocking(move || {
store
.lock()
.expect("store mutex poisoned")
.authorize_owner_sub(&claims.sub)
})
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if !owns {
return Err(StatusCode::FORBIDDEN);
}
Ok(next.run(request).await)
}
/// Extract the `Authorization: Bearer <token>` value, if present.
fn bearer_token(request: &Request) -> Option<String> {
request
.headers()
.get(axum::http::header::AUTHORIZATION)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "))
.map(str::to_string)
}
/// One `POST /rpc` call: a method name + params, mirroring the unix-socket RPC.
#[derive(Debug, Deserialize)]
struct RpcCall {
method: String,
#[serde(default)]
params: Value,
}
/// `POST /rpc` — run one [`rpc::dispatch`] call on the hub's store and return a
/// JSON-RPC-shaped [`Response`] (result xor error). Always HTTP 200; method
/// failures travel in the body so the client can reconstruct the error.
async fn rpc_call(State(state): State<HubState>, Json(call): Json<RpcCall>) -> Json<Response> {
let store = state.store.clone();
let dispatched = tokio::task::spawn_blocking(move || {
let mut guard = store.lock().expect("store mutex poisoned");
rpc::dispatch(&mut *guard, &call.method, call.params)
})
.await;
let response = match dispatched {
Ok(Ok(value)) => Response::ok(Value::Null, value),
Ok(Err(rpc_err)) => Response::failed(Value::Null, rpc_err),
Err(join_err) => Response::failed(
Value::Null,
RpcError {
code: INTERNAL_ERROR,
message: format!("dispatch task failed: {join_err}"),
},
),
};
Json(response)
}
#[derive(Debug, Deserialize)]
struct PullQuery {
/// HLC cursor — return ops strictly newer than this (absent ⇒ from the start).
#[serde(default)]
after: Option<String>,
}
/// `GET /sync/pull?after=<hlc>` — ops past the caller's cursor, HLC order.
async fn pull(
State(state): State<HubState>,
Query(q): Query<PullQuery>,
) -> Result<Json<OpsBody>, StatusCode> {
let ops = with_store(&state.store, move |s| s.ops_since(q.after.as_deref()))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(OpsBody { ops }))
}
/// `POST /sync/push` — merge the caller's ops; reply with how many newly applied.
async fn push(
State(state): State<HubState>,
Json(body): Json<OpsBody>,
) -> Result<Json<SyncReport>, StatusCode> {
let (applied, _max) = with_store(&state.store, move |s| apply_batch(s, body.ops))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(SyncReport {
applied,
..Default::default()
}))
}
/// Exchange ops with `hub_url` once: pull new ops and merge them, then push our
/// new ops. Advances the per-hub cursors so the next call transfers only the
/// tail. `http` is a shared [`reqwest::Client`].
pub async fn sync_once(
store: SharedStore,
hub_url: &str,
http: &reqwest::Client,
bearer: Option<&str>,
) -> Result<SyncReport> {
let base = hub_url.trim_end_matches('/');
let mut report = SyncReport::default();
let cursors = {
let hub = hub_url.to_string();
with_store(&store, move |s| s.sync_state(&hub)).await?
};
// --- pull then merge ---
let mut req = http.get(format!("{base}/sync/pull"));
if let Some(after) = &cursors.last_pulled_hlc {
req = req.query(&[("after", after)]);
}
if let Some(token) = bearer {
req = req.bearer_auth(token);
}
let pulled: OpsBody = req.send().await?.error_for_status()?.json().await?;
report.pulled = pulled.ops.len();
if !pulled.ops.is_empty() {
let (applied, max_pulled) = with_store(&store, move |s| apply_batch(s, pulled.ops)).await?;
report.applied = applied;
if let Some(cursor) = max_pulled {
let hub = hub_url.to_string();
with_store(&store, move |s| s.record_sync(&hub, None, Some(&cursor))).await?;
}
}
// --- push our tail ---
let to_push = {
let after = cursors.last_pushed_hlc.clone();
with_store(&store, move |s| s.ops_since(after.as_deref())).await?
};
report.pushed = to_push.len();
if !to_push.is_empty() {
// `ops_since` returns HLC order, so the last is the new cursor.
let max_pushed = to_push.last().map(|o| o.hlc.clone());
let mut req = http
.post(format!("{base}/sync/push"))
.json(&OpsBody { ops: to_push });
if let Some(token) = bearer {
req = req.bearer_auth(token);
}
req.send().await?.error_for_status()?;
if let Some(cursor) = max_pushed {
let hub = hub_url.to_string();
with_store(&store, move |s| s.record_sync(&hub, Some(&cursor), None)).await?;
}
}
Ok(report)
}