From 8c25d114c4294c82503d565446f4d72ea53fdf97 Mon Sep 17 00:00:00 2001 From: Erich Blume Date: Mon, 1 Jun 2026 15:14:20 -0700 Subject: [PATCH] =?UTF-8?q?hephd:=20network=20sync=20over=20HTTP=20?= =?UTF-8?q?=E2=80=94=20hub=20+=20spoke=20(sync=209a)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire the existing merge engine over the network so the everyday config (local + hub_url) syncs through a hub. Transport ratified = axum HTTP/JSON (tech-spec §6.1, §12). - heph-core: SyncCursors model + Store::sync_state/record_sync over the sync_state table (per-peer push/pull HLC cursors). Incremental, so each exchange transfers only the tail. - hephd::sync: the hub router (POST /sync/push, GET /sync/pull?after=) served from the shared LocalStore, and sync_once — a spoke's pull-then- merge, then push-tail exchange, advancing the cursors. Idempotent: a re-pushed op the hub already has is a no-op. - Daemon carries optional hub config; sync.now/sync.status handled at the daemon (they need the hub transport the store can't reach). conflicts. list/resolve now reachable over the unix socket too. - main: --mode local|server, --hub-url, --http-addr. server mode binds the hub HTTP endpoint on the same store; a local+hub_url spoke background- syncs on a 30s interval. - tests/sync_http.rs: two spokes converge through a real-HTTP hub on an ephemeral port — node propagation and a divergent-scalar conflict. Unauthenticated/single-owner for now; OIDC + per-user scoping is slice 10, client mode + RemoteStore is 9b. 100 tests green; clippy -D warnings + fmt + prek clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 606 +++++++++++++++++++++++ Cargo.toml | 5 + README.md | 9 +- crates/heph-core/src/lib.rs | 2 +- crates/heph-core/src/model.rs | 12 + crates/heph-core/src/sqlite/mod.rs | 18 +- crates/heph-core/src/sqlite/syncstate.rs | 47 ++ crates/heph-core/src/store.rs | 13 +- crates/heph-core/tests/convergence.rs | 30 +- crates/hephd/Cargo.toml | 2 + crates/hephd/src/lib.rs | 2 + crates/hephd/src/main.rs | 79 ++- crates/hephd/src/rpc.rs | 13 + crates/hephd/src/server.rs | 122 ++++- crates/hephd/src/sync.rs | 176 +++++++ crates/hephd/tests/sync_http.rs | 135 +++++ docs/changelog.d/v1-prototype.feature.md | 1 + docs/reference/tech-spec.md | 9 +- 18 files changed, 1239 insertions(+), 42 deletions(-) create mode 100644 crates/heph-core/src/sqlite/syncstate.rs create mode 100644 crates/hephd/src/sync.rs create mode 100644 crates/hephd/tests/sync_http.rs diff --git a/Cargo.lock b/Cargo.lock index 2241351..296ab67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,12 +119,76 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2032f911046de80f0a198e0901378627c33f59ea0ac00e363d481118bd70a53" +[[package]] +name = "axum" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" +dependencies = [ + "axum-core", + "bytes", + "form_urlencoded", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bit-set" version = "0.8.0" @@ -290,6 +354,17 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "displaydoc" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac70aa55017e108007fbaf5aa0f54b021c98f92ff8af59d42eda9da96e3dd4f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "errno" version = "0.3.14" @@ -354,6 +429,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "form_urlencoded" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" +dependencies = [ + "percent-encoding", +] + [[package]] name = "fs4" version = "0.12.0" @@ -364,6 +448,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "futures-channel" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" +dependencies = [ + "futures-core", +] + [[package]] name = "futures-core" version = "0.3.32" @@ -461,9 +554,11 @@ name = "hephd" version = "0.0.0" dependencies = [ "anyhow", + "axum", "clap", "fs4", "heph-core", + "reqwest", "serde", "serde_json", "tempfile", @@ -473,6 +568,95 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "http" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be7462df143984c4598a256ef469b251d7d7f9e271135073e78fc535414f3d0" +dependencies = [ + "bytes", + "itoa", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" + +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "hyper" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55281c53a1894c864990125767da440a4e630446785086f52523b20033b74498" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + +[[package]] +name = "hyper-util" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" +dependencies = [ + "base64", + "bytes", + "futures-channel", + "futures-util", + "http", + "http-body", + "hyper", + "ipnet", + "libc", + "percent-encoding", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.65" @@ -497,6 +681,115 @@ dependencies = [ "cc", ] +[[package]] +name = "icu_collections" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2984d1cd16c883d7935b9e07e44071dca8d917fd52ecc02c04d5fa0b5a3f191c" +dependencies = [ + "displaydoc", + "potential_utf", + "utf8_iter", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92219b62b3e2b4d88ac5119f8904c10f8f61bf7e95b640d25ba3075e6cac2c29" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c56e5ee99d6e3d33bd91c5d85458b6005a22140021cc324cea84dd0e72cff3b4" +dependencies = [ + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da3be0ae77ea334f4da67c12f149704f19f81d1adf7c51cf482943e84a2bad38" + +[[package]] +name = "icu_properties" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bee3b67d0ea5c2cca5003417989af8996f8604e34fb9ddf96208a033901e70de" +dependencies = [ + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e2bbb201e0c04f7b4b3e14382af113e17ba4f63e2c9d2ee626b720cbce54a14" + +[[package]] +name = "icu_provider" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139c4cf31c8b5f33d7e199446eff9c1e02decfc2f0eec2c8d71f65befa45b421" +dependencies = [ + "displaydoc", + "icu_locale_core", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb68373c0d6620ef8105e855e7745e18b0d00d3bdb07fb532e434244cdb9a714" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + +[[package]] +name = "ipnet" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -556,6 +849,12 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" +[[package]] +name = "litemap" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" + [[package]] name = "lock_api" version = "0.4.14" @@ -580,12 +879,24 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "memchr" version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b947ae49db0d222b1dbc6b113ce7248a3fc3a6ca21b696717bfc000ba4484d8" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "mio" version = "1.2.1" @@ -655,6 +966,12 @@ dependencies = [ "regex", ] +[[package]] +name = "percent-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" + [[package]] name = "phf" version = "0.11.3" @@ -705,6 +1022,15 @@ version = "0.3.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" +[[package]] +name = "potential_utf" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564" +dependencies = [ + "zerovec", +] + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -865,6 +1191,38 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "reqwest" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "219c5811de6525e5416c7d5d53bb656d3afdbc6c5af816e0802bcfa42dbdc1c3" +dependencies = [ + "base64", + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "rrule" version = "0.13.0" @@ -937,6 +1295,12 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "ryu" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" + [[package]] name = "scopeguard" version = "1.2.0" @@ -986,6 +1350,29 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -1038,6 +1425,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" + [[package]] name = "strsim" version = "0.11.1" @@ -1055,6 +1448,26 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] + +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tempfile" version = "3.27.0" @@ -1117,6 +1530,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "tinystr" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tokio" version = "1.52.3" @@ -1143,12 +1566,59 @@ dependencies = [ "syn", ] +[[package]] +name = "tower" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" +dependencies = [ + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", + "url", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + [[package]] name = "tracing" version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -1204,6 +1674,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "ulid" version = "1.2.1" @@ -1232,6 +1708,24 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "url" +version = "2.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -1265,6 +1759,15 @@ dependencies = [ "libc", ] +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -1293,6 +1796,16 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9473dbd2991ae90b6291c3c32c30c6187ac49aa32f9905d1cce280ec1e110b0f" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.122" @@ -1325,6 +1838,16 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "web-sys" +version = "0.3.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621441cfc37b84979402712047321980c178f299193a3589d05b99e8763436" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "web-time" version = "1.1.0" @@ -1491,6 +2014,35 @@ version = "0.57.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" +[[package]] +name = "writeable" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" + +[[package]] +name = "yoke" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca" +dependencies = [ + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "yrs" version = "0.26.0" @@ -1529,6 +2081,60 @@ dependencies = [ "syn", ] +[[package]] +name = "zerofrom" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ec05a11813ea801ff6d75110ad09cd0824ddba17dfe17128ea0d5f68e6c5272" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerotrie" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f9152d31db0792fa83f70fb2f83148effb5c1f5b8c7686c3459e361d9bc20bf" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zmij" version = "1.0.21" diff --git a/Cargo.toml b/Cargo.toml index 05610a1..a2a291d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,11 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } clap = { version = "4", features = ["derive"] } fs4 = "0.12" +axum = "0.8" +reqwest = { version = "0.13", default-features = false, features = [ + "json", + "query", +] } [profile.release] lto = "thin" diff --git a/README.md b/README.md index 51f4dfd..f706e25 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ See **[docs/explanation/design.md](docs/explanation/design.md)** for the vision ## Status -**Phase 1 (v1 prototype) — in progress** on branch `feature/v1-prototype`. The **local-only system is feature-complete and the offline-sync merge engine converges** (including a `yrs` text-CRDT for bodies); remaining work is the network transport, auth, and the Neovim plugin. Built test-first (97 tests at last update). The canonical tracker is **tech-spec §14**. +**Phase 1 (v1 prototype) — in progress** on branch `feature/v1-prototype`. The **local system is feature-complete and replicas now sync through a hub over HTTP** — the offline-first everyday config (`local` + `hub_url`) converges end-to-end, with a `yrs` text-CRDT merging bodies. Remaining: the online-only `client` mode, auth, and the Neovim plugin. Built test-first (100 tests at last update). The canonical tracker is **tech-spec §14**. | Area | State | |---|---| @@ -20,7 +20,8 @@ See **[docs/explanation/design.md](docs/explanation/design.md)** for the vision | `heph` CLI; `list` / `health` / `journal` / full-text `search` (FTS5) | ✅ done | | Sync engine — HLC, op-log, converging merge + conflict queue (no network yet) | ✅ done | | yrs text-CRDT for body merge | ✅ done | -| `server`/`client` modes + network push/pull sync | ⏳ next | +| `server` (hub) mode + spoke push/pull sync over HTTP (axum) | ✅ done | +| `client` mode + `RemoteStore` (online-only, no replica) | ⏳ next | | OIDC/Authentik auth + per-user isolation | ⏳ | | `heph.nvim` (primary surface) | ⏳ | @@ -29,7 +30,7 @@ See **[docs/explanation/design.md](docs/explanation/design.md)** for the vision A Cargo workspace, layered so the same core runs from a laptop to a hub: - **`crates/heph-core`** — the library: data model, the `Store` trait + SQLite store, markdown parsing/extraction, recurrence, the "what is next?" engine, and the sync engine (op-log, hybrid logical clocks, CRDT/LWW merge, conflict detection). Synchronous and clock-injected (no ambient wall-clock reads) so ranking and merge are deterministic. -- **`crates/hephd`** — the per-device daemon. One binary, three modes — **`local`** (own SQLite replica), **`server`** (also a network endpoint + sync hub), **`client`** (thin, remote) — selected by configuration via a targetable `Store` backend. Surfaces connect to it over a unix socket; it owns the DB handle and (later) background sync. +- **`crates/hephd`** — the per-device daemon. One binary, three modes — **`local`** (own SQLite replica; a syncing spoke when given `--hub-url`), **`server`** (also the sync hub: an HTTP endpoint others sync against), **`client`** *(planned)* (thin, remote, no replica) — selected by configuration via a targetable `Store` backend. Surfaces connect to it over a unix socket; it owns the DB handle and background sync. - **`crates/heph`** — the CLI: a thin client of the daemon (no direct DB access). - **`heph.nvim/`** *(planned)* — the Neovim plugin, the primary editing/agenda surface. @@ -82,7 +83,7 @@ mise run ai-docs # docs AI agents read firs ``` ./Cargo.toml # workspace manifest ./crates/heph-core/ # core library: model, store, extraction, recurrence, ranking, sync -./crates/hephd/ # daemon: local mode (JSON-RPC over a unix socket); server/client planned +./crates/hephd/ # daemon: local + server (hub) modes — unix-socket RPC + HTTP sync; client planned ./crates/heph/ # CLI: thin client of the daemon ./heph.nvim/ # Neovim plugin (planned) ./docs/ # Diataxis docs (design, tech-spec, how-to), Quartz config diff --git a/crates/heph-core/src/lib.rs b/crates/heph-core/src/lib.rs index 90f71ac..72bb5bc 100644 --- a/crates/heph-core/src/lib.rs +++ b/crates/heph-core/src/lib.rs @@ -28,7 +28,7 @@ pub use extract::{extract, ContextItem, Extraction}; pub use hlc::{Hlc, HlcClock}; pub use model::{ deterministic_id, Attention, Conflict, Health, Link, LinkType, NewNode, NewTask, Node, - NodeKind, Task, TaskState, + NodeKind, SyncCursors, Task, TaskState, }; pub use oplog::Op; pub use ranking::{rank, Dimension, RankedTask, RANKING}; diff --git a/crates/heph-core/src/model.rs b/crates/heph-core/src/model.rs index 879e0cf..f6d7a5e 100644 --- a/crates/heph-core/src/model.rs +++ b/crates/heph-core/src/model.rs @@ -289,6 +289,18 @@ pub struct Conflict { pub created_at: i64, } +/// Per-peer sync cursors (a row of `sync_state`, tech-spec §12). Each is the +/// HLC of the last op exchanged with that peer in each direction, so sync only +/// ever transfers the tail. `None` means "nothing yet" (transfer from the +/// beginning). +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct SyncCursors { + /// HLC of the last op we pushed to the peer. + pub last_pushed_hlc: Option, + /// HLC of the last op we pulled from the peer. + pub last_pulled_hlc: Option, +} + /// Deterministic id for key-unique kinds (`journal`/`tag`) so two offline /// replicas that independently create the same logical singleton converge /// (tech-spec §3.1, [[design]] §3.1). Content nodes use random ULIDs instead. diff --git a/crates/heph-core/src/sqlite/mod.rs b/crates/heph-core/src/sqlite/mod.rs index 93103b4..f09821f 100644 --- a/crates/heph-core/src/sqlite/mod.rs +++ b/crates/heph-core/src/sqlite/mod.rs @@ -16,6 +16,7 @@ mod log; mod migrations; mod nodes; mod ops; +mod syncstate; mod tasks; pub use migrations::latest_version; @@ -29,7 +30,8 @@ use crate::clock::Clock; use crate::error::{Error, Result}; use crate::hlc::Hlc; use crate::model::{ - Attention, Conflict, Health, Link, LinkType, NewNode, NewTask, Node, Task, TaskState, + Attention, Conflict, Health, Link, LinkType, NewNode, NewTask, Node, SyncCursors, Task, + TaskState, }; use crate::oplog::Op; use crate::ranking::RankedTask; @@ -314,6 +316,20 @@ impl Store for LocalStore { Ok(()) } + fn sync_state(&self, peer: &str) -> Result { + syncstate::get(&self.conn, peer) + } + + fn record_sync( + &mut self, + peer: &str, + pushed: Option<&str>, + pulled: Option<&str>, + ) -> Result<()> { + let now = self.clock.now_ms(); + syncstate::record(&self.conn, peer, pushed, pulled, now) + } + fn conflicts_list(&self) -> Result> { apply::list_conflicts(&self.conn, &self.owner_id) } diff --git a/crates/heph-core/src/sqlite/syncstate.rs b/crates/heph-core/src/sqlite/syncstate.rs new file mode 100644 index 0000000..cd72952 --- /dev/null +++ b/crates/heph-core/src/sqlite/syncstate.rs @@ -0,0 +1,47 @@ +//! `sync_state` table operations — per-peer push/pull HLC cursors (tech-spec +//! §12). A spoke tracks, for each hub it syncs with, the HLC of the last op it +//! pushed and the last it pulled, so each exchange transfers only the tail. + +use rusqlite::{Connection, OptionalExtension}; + +use crate::error::Result; +use crate::model::SyncCursors; + +/// Read the cursors for `peer`, or empty cursors if never synced. +pub(super) fn get(conn: &Connection, peer: &str) -> Result { + let row: Option<(Option, Option)> = conn + .query_row( + "SELECT last_pushed_hlc, last_pulled_hlc FROM sync_state WHERE peer = ?1", + [peer], + |r| Ok((r.get(0)?, r.get(1)?)), + ) + .optional()?; + Ok(match row { + Some((last_pushed_hlc, last_pulled_hlc)) => SyncCursors { + last_pushed_hlc, + last_pulled_hlc, + }, + None => SyncCursors::default(), + }) +} + +/// Advance `peer`'s cursors. A `None` direction is left unchanged (COALESCE +/// keeps the prior value on update). Upserts the row. +pub(super) fn record( + conn: &Connection, + peer: &str, + pushed: Option<&str>, + pulled: Option<&str>, + now: i64, +) -> Result<()> { + conn.execute( + "INSERT INTO sync_state (peer, last_pushed_hlc, last_pulled_hlc, updated_at) + VALUES (?1, ?2, ?3, ?4) + ON CONFLICT(peer) DO UPDATE SET + last_pushed_hlc = COALESCE(?2, last_pushed_hlc), + last_pulled_hlc = COALESCE(?3, last_pulled_hlc), + updated_at = ?4", + (peer, pushed, pulled, now), + )?; + Ok(()) +} diff --git a/crates/heph-core/src/store.rs b/crates/heph-core/src/store.rs index c4aaea6..d1fc704 100644 --- a/crates/heph-core/src/store.rs +++ b/crates/heph-core/src/store.rs @@ -6,7 +6,8 @@ use crate::error::Result; use crate::model::{ - Attention, Conflict, Health, Link, LinkType, NewNode, NewTask, Node, Task, TaskState, + Attention, Conflict, Health, Link, LinkType, NewNode, NewTask, Node, SyncCursors, Task, + TaskState, }; use crate::oplog::Op; use crate::ranking::RankedTask; @@ -127,6 +128,16 @@ pub trait Store { /// incl. owner-embedded deterministic ids, is refined with auth.) fn adopt_owner(&mut self, canonical: &str) -> Result<()>; + /// The push/pull HLC cursors for a sync `peer` (the hub url). Defaults to + /// empty cursors when this replica has never synced with `peer` (§12). + fn sync_state(&self, peer: &str) -> Result; + + /// Record progress with a sync `peer`: advance the `pushed`/`pulled` HLC + /// cursors (each `None` leaves that direction unchanged). Upserts the + /// `sync_state` row (§12). + fn record_sync(&mut self, peer: &str, pushed: Option<&str>, pulled: Option<&str>) + -> Result<()>; + /// Open merge conflicts surfaced for the user (`heph conflicts`). fn conflicts_list(&self) -> Result>; diff --git a/crates/heph-core/tests/convergence.rs b/crates/heph-core/tests/convergence.rs index aa50b64..f84af1e 100644 --- a/crates/heph-core/tests/convergence.rs +++ b/crates/heph-core/tests/convergence.rs @@ -4,7 +4,9 @@ //! through the yrs text CRDT instead of clobbering with last-writer-wins. No //! network yet — we hand ops across directly. -use heph_core::{Attention, Clock, LocalStore, NewNode, NewTask, Op, Store, TaskState}; +use heph_core::{ + Attention, Clock, LocalStore, NewNode, NewTask, Op, Store, SyncCursors, TaskState, +}; use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; @@ -46,6 +48,32 @@ fn sync_one_way(src: &dyn Store, dst: &mut dyn Store, cursor: Option<&str>) -> O last } +#[test] +fn sync_cursors_default_empty_then_advance_per_direction() { + let (mut a, _ca) = replica(1000); + const HUB: &str = "https://hub.example"; + + assert_eq!(a.sync_state(HUB).unwrap(), SyncCursors::default()); + + // Advancing only the push cursor leaves pull untouched, and vice versa. + a.record_sync(HUB, Some("hlc-push-1"), None).unwrap(); + a.record_sync(HUB, None, Some("hlc-pull-1")).unwrap(); + assert_eq!( + a.sync_state(HUB).unwrap(), + SyncCursors { + last_pushed_hlc: Some("hlc-push-1".into()), + last_pulled_hlc: Some("hlc-pull-1".into()), + } + ); + + // A second peer is tracked independently. + a.record_sync("other", Some("x"), None).unwrap(); + assert_eq!( + a.sync_state(HUB).unwrap().last_pushed_hlc.as_deref(), + Some("hlc-push-1") + ); +} + #[test] fn online_round_trip_propagates_a_node() { let (mut a, _ca) = replica(1000); diff --git a/crates/hephd/Cargo.toml b/crates/hephd/Cargo.toml index d9f751c..d15d0b8 100644 --- a/crates/hephd/Cargo.toml +++ b/crates/hephd/Cargo.toml @@ -27,6 +27,8 @@ tracing.workspace = true tracing-subscriber.workspace = true clap.workspace = true fs4.workspace = true +axum.workspace = true +reqwest.workspace = true [dev-dependencies] tempfile = "3" diff --git a/crates/hephd/src/lib.rs b/crates/hephd/src/lib.rs index f54e0f6..85ebbfe 100644 --- a/crates/hephd/src/lib.rs +++ b/crates/hephd/src/lib.rs @@ -12,6 +12,7 @@ pub mod clock; pub mod lock; pub mod rpc; pub mod server; +pub mod sync; use std::path::PathBuf; @@ -19,6 +20,7 @@ pub use client::Client; pub use clock::SystemClock; pub use lock::LockGuard; pub use server::Daemon; +pub use sync::{sync_once, SyncReport}; /// Default unix socket path: `$XDG_RUNTIME_DIR/heph/hephd.sock`, falling back to /// the system temp dir when `XDG_RUNTIME_DIR` is unset (tech-spec §3). diff --git a/crates/hephd/src/main.rs b/crates/hephd/src/main.rs index 5e0c1c1..de54dfb 100644 --- a/crates/hephd/src/main.rs +++ b/crates/hephd/src/main.rs @@ -1,18 +1,42 @@ -//! `hephd` binary — starts the daemon in `local` mode (slice 6). +//! `hephd` binary — starts the daemon in `local` or `server` mode. +//! +//! Both modes own the local SQLite file (exclusive lock) and serve surfaces +//! over a unix socket. **server** additionally exposes the hub HTTP endpoint for +//! spokes to sync against; a **local** instance given `--hub-url` becomes a +//! syncing spoke that background-exchanges its op-log with that hub (tech-spec +//! §3.1, §6.1, §12). `client` mode (no local replica) is a later slice. use std::path::PathBuf; +use std::time::Duration; use anyhow::{Context, Result}; -use clap::Parser; -use tokio::net::UnixListener; +use clap::{Parser, ValueEnum}; +use tokio::net::{TcpListener, UnixListener}; use heph_core::LocalStore; -use hephd::{default_db_path, default_socket_path, Daemon, LockGuard, SystemClock}; +use hephd::{default_db_path, default_socket_path, sync, Daemon, LockGuard, SystemClock}; + +/// How often a spoke background-syncs with its hub. +const SYNC_INTERVAL: Duration = Duration::from_secs(30); +/// Default hub HTTP bind address in server mode. +const DEFAULT_HTTP_ADDR: &str = "127.0.0.1:8787"; + +#[derive(Copy, Clone, Debug, PartialEq, Eq, ValueEnum)] +enum Mode { + /// Own replica; no inbound network endpoint (syncing spoke if `--hub-url`). + Local, + /// Also a sync hub: exposes the authenticated network endpoint over HTTP. + Server, +} /// The Hephaestus per-device daemon. #[derive(Parser, Debug)] #[command(name = "hephd", version, about)] struct Cli { + /// Runtime mode. + #[arg(long, value_enum, default_value_t = Mode::Local)] + mode: Mode, + /// Path to the SQLite store file. #[arg(long)] db: Option, @@ -20,6 +44,14 @@ struct Cli { /// Path to the unix socket to listen on. #[arg(long)] socket: Option, + + /// Hub to background-sync this replica's op-log with (makes it a spoke). + #[arg(long)] + hub_url: Option, + + /// Address for the hub HTTP endpoint (server mode only). + #[arg(long)] + http_addr: Option, } #[tokio::main] @@ -47,6 +79,41 @@ async fn main() -> Result<()> { // Take the exclusive lock before opening the store (tech-spec §3.1). let _lock = LockGuard::acquire(&db)?; let store = LocalStore::open(&db, Box::new(SystemClock))?; + let daemon = Daemon::new(store).with_hub(cli.hub_url.clone()); + + // server mode: expose the hub HTTP endpoint over the same store. + if cli.mode == Mode::Server { + let addr = cli + .http_addr + .clone() + .unwrap_or_else(|| DEFAULT_HTTP_ADDR.to_string()); + let app = sync::router(daemon.store()); + let listener = TcpListener::bind(&addr) + .await + .with_context(|| format!("binding hub HTTP endpoint {addr}"))?; + tracing::info!(%addr, "hub HTTP endpoint listening"); + tokio::spawn(async move { + if let Err(e) = axum::serve(listener, app).await { + tracing::error!("hub HTTP endpoint stopped: {e}"); + } + }); + } + + // spoke: background-sync the op-log with the configured hub. + if let Some(hub) = cli.hub_url.clone() { + let store = daemon.store(); + tokio::spawn(async move { + let http = reqwest::Client::new(); + let mut tick = tokio::time::interval(SYNC_INTERVAL); + loop { + tick.tick().await; + match hephd::sync_once(store.clone(), &hub, &http).await { + Ok(report) => tracing::debug!(?report, "background sync"), + Err(e) => tracing::warn!("background sync failed: {e}"), + } + } + }); + } // Replace any stale socket from a previous run, then bind. if socket.exists() { @@ -56,6 +123,6 @@ async fn main() -> Result<()> { let listener = UnixListener::bind(&socket) .with_context(|| format!("binding socket {}", socket.display()))?; - tracing::info!(db = %db.display(), socket = %socket.display(), "hephd local mode listening"); - Daemon::new(store).serve(listener).await + tracing::info!(db = %db.display(), socket = %socket.display(), mode = ?cli.mode, "hephd listening"); + daemon.serve(listener).await } diff --git a/crates/hephd/src/rpc.rs b/crates/hephd/src/rpc.rs index 29c5b3f..d6762b7 100644 --- a/crates/hephd/src/rpc.rs +++ b/crates/hephd/src/rpc.rs @@ -186,6 +186,13 @@ struct ExportParams { path: String, } +#[derive(Deserialize)] +struct ConflictResolveParams { + id: String, + /// `"local"` or `"remote"` — the value the user chooses to keep. + choice: String, +} + /// Default `next`/`list` result size (tech-spec §6). const DEFAULT_LIMIT: usize = 5; /// Default `log.tail` size. @@ -267,6 +274,12 @@ pub fn dispatch(store: &mut dyn Store, method: &str, params: Value) -> Result json!(store.conflicts_list()?), + "conflicts.resolve" => { + let p: ConflictResolveParams = parse(params)?; + store.conflicts_resolve(&p.id, &p.choice)?; + json!({ "ok": true }) + } other => { return Err(RpcError::new( METHOD_NOT_FOUND, diff --git a/crates/hephd/src/server.rs b/crates/hephd/src/server.rs index f4c81b8..8e158f3 100644 --- a/crates/hephd/src/server.rs +++ b/crates/hephd/src/server.rs @@ -4,39 +4,69 @@ //! `heph-core` is synchronous and its SQLite handle is single-writer, so the //! store sits behind an `Arc>`; each request locks it inside a //! `spawn_blocking` task (DB calls never run on an async worker, tech-spec §3). +//! +//! Two methods are handled here rather than in [`rpc::dispatch`] because they +//! need transport the store can't reach: `sync.now` / `sync.status` exchange +//! ops with the configured hub (tech-spec §6.1, §12). use std::sync::{Arc, Mutex}; use anyhow::Result; -use serde_json::Value; +use serde_json::{json, Value}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{UnixListener, UnixStream}; -use heph_core::LocalStore; +use heph_core::{LocalStore, Store}; -use crate::rpc::{self, Request, Response, RpcError, PARSE_ERROR}; +use crate::rpc::{self, Request, Response, RpcError, INTERNAL_ERROR, PARSE_ERROR}; +use crate::sync; + +/// The shared, cheaply-cloneable context each connection serves from. +#[derive(Clone)] +struct Ctx { + store: Arc>, + /// The hub this device syncs with, if it is a spoke (`local` + `hub_url`). + hub_url: Option, + http: reqwest::Client, +} /// A running daemon over a shared local store. pub struct Daemon { - store: Arc>, + ctx: Ctx, } impl Daemon { /// Wrap an opened store. pub fn new(store: LocalStore) -> Daemon { Daemon { - store: Arc::new(Mutex::new(store)), + ctx: Ctx { + store: Arc::new(Mutex::new(store)), + hub_url: None, + http: reqwest::Client::new(), + }, } } + /// Configure the hub this device syncs with (`sync.now` targets it). + pub fn with_hub(mut self, hub_url: Option) -> Daemon { + self.ctx.hub_url = hub_url; + self + } + + /// The shared store handle, for code that needs to reach the same store the + /// daemon serves (the hub HTTP router and background sync, tech-spec §6.1). + pub fn store(&self) -> Arc> { + self.ctx.store.clone() + } + /// Serve connections on `listener` until the task is cancelled. Each /// connection is handled concurrently; all share the one store. pub async fn serve(&self, listener: UnixListener) -> Result<()> { loop { let (stream, _addr) = listener.accept().await?; - let store = self.store.clone(); + let ctx = self.ctx.clone(); tokio::spawn(async move { - if let Err(e) = handle_connection(stream, store).await { + if let Err(e) = handle_connection(stream, ctx).await { tracing::debug!("connection closed: {e}"); } }); @@ -44,7 +74,7 @@ impl Daemon { } } -async fn handle_connection(stream: UnixStream, store: Arc>) -> Result<()> { +async fn handle_connection(stream: UnixStream, ctx: Ctx) -> Result<()> { let (read_half, mut write_half) = stream.into_split(); let mut lines = BufReader::new(read_half).lines(); @@ -52,7 +82,7 @@ async fn handle_connection(stream: UnixStream, store: Arc>) -> if line.trim().is_empty() { continue; } - let response = process_line(&line, &store).await; + let response = process_line(&line, &ctx).await; let mut out = serde_json::to_string(&response)?; out.push('\n'); write_half.write_all(out.as_bytes()).await?; @@ -61,7 +91,7 @@ async fn handle_connection(stream: UnixStream, store: Arc>) -> Ok(()) } -async fn process_line(line: &str, store: &Arc>) -> Response { +async fn process_line(line: &str, ctx: &Ctx) -> Response { let request: Request = match serde_json::from_str(line) { Ok(r) => r, Err(e) => { @@ -76,26 +106,70 @@ async fn process_line(line: &str, store: &Arc>) -> Response { }; let id = request.id.clone(); - let store = store.clone(); - let method = request.method; - let params = request.params; + let result = match request.method.as_str() { + // Sync methods need the hub transport, which the store can't reach. + "sync.now" => sync_now(ctx).await, + "sync.status" => sync_status(ctx).await, + // Everything else is a pure store call on the blocking pool. + _ => dispatch_blocking(ctx, request.method, request.params).await, + }; - // DB work runs on the blocking pool; the store mutex is held only there. + match result { + Ok(value) => Response::ok(id, value), + Err(rpc_err) => Response::failed(id, rpc_err), + } +} + +/// Run a store method on the blocking pool (DB never touches an async worker). +async fn dispatch_blocking(ctx: &Ctx, method: String, params: Value) -> Result { + let store = ctx.store.clone(); let dispatched = tokio::task::spawn_blocking(move || { let mut guard = store.lock().expect("store mutex poisoned"); rpc::dispatch(&mut *guard, &method, params) }) .await; - match dispatched { - Ok(Ok(result)) => Response::ok(id, result), - Ok(Err(rpc_err)) => Response::failed(id, rpc_err), - Err(join_err) => Response::failed( - id, - RpcError { - code: rpc::INTERNAL_ERROR, - message: format!("dispatch task failed: {join_err}"), - }, - ), + Ok(inner) => inner, + Err(join_err) => Err(RpcError { + code: INTERNAL_ERROR, + message: format!("dispatch task failed: {join_err}"), + }), } } + +/// `sync.now` — exchange ops with the configured hub once. +async fn sync_now(ctx: &Ctx) -> Result { + let Some(hub_url) = ctx.hub_url.clone() else { + return Err(RpcError { + code: INTERNAL_ERROR, + message: "no hub_url configured; this instance is standalone".into(), + }); + }; + match sync::sync_once(ctx.store.clone(), &hub_url, &ctx.http).await { + Ok(report) => Ok(json!(report)), + Err(e) => Err(RpcError { + code: INTERNAL_ERROR, + message: format!("sync failed: {e}"), + }), + } +} + +/// `sync.status` — the hub url and the current per-hub cursors. +async fn sync_status(ctx: &Ctx) -> Result { + let Some(hub_url) = ctx.hub_url.clone() else { + return Ok(json!({ "hub_url": Value::Null })); + }; + let store = ctx.store.clone(); + let hub = hub_url.clone(); + let cursors = tokio::task::spawn_blocking(move || { + let guard = store.lock().expect("store mutex poisoned"); + guard.sync_state(&hub) + }) + .await + .map_err(|e| RpcError { + code: INTERNAL_ERROR, + message: format!("sync.status task failed: {e}"), + })? + .map_err(RpcError::from)?; + Ok(json!({ "hub_url": hub_url, "cursors": cursors })) +} diff --git a/crates/hephd/src/sync.rs b/crates/hephd/src/sync.rs new file mode 100644 index 0000000..a559ac3 --- /dev/null +++ b/crates/hephd/src/sync.rs @@ -0,0 +1,176 @@ +//! Spoke↔hub op-log sync over HTTP (tech-spec §6.1, §12). +//! +//! The merge engine itself lives in `heph-core` (deterministic, transport-free). +//! This module is the **transport**: a [`router`] the **hub** (server mode) +//! mounts, and [`sync_once`] a **spoke** (`local` + `hub_url`) runs to exchange +//! ops with that hub. Both speak JSON over HTTP with two routes: +//! +//! - `POST /sync/push` — the spoke sends its new ops; the hub merges them. +//! - `GET /sync/pull?after=` — the hub returns ops past the spoke's cursor. +//! +//! Exchange is **incremental by HLC cursor** (`sync_state`, [`heph_core::SyncCursors`]): +//! each side transfers only the tail it hasn't sent/seen. Merge is idempotent, +//! so a re-pushed op the hub already has is a harmless no-op. Auth is deferred to +//! tech-spec §13 (slice 10) — the endpoint is currently unauthenticated and +//! scoped to the hub's single owner. + +use std::sync::{Arc, Mutex}; + +use anyhow::Result; +use axum::extract::{Query, State}; +use axum::http::StatusCode; +use axum::routing::{get, post}; +use axum::{Json, Router}; +use serde::{Deserialize, Serialize}; + +use heph_core::{LocalStore, Op, Store}; + +/// The shared store handle a hub serves from. +type SharedStore = Arc>; + +/// A batch of ops in flight (push body / pull response). +#[derive(Debug, Serialize, Deserialize)] +pub struct OpsBody { + /// The ops, applied in HLC order by the receiver. + pub ops: Vec, +} + +/// What one [`sync_once`] exchange moved. +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct SyncReport { + /// Ops received from the hub. + pub pulled: usize, + /// Of the pulled ops, how many were newly applied (not already seen). + pub applied: usize, + /// Ops sent to the hub. + pub pushed: usize, +} + +/// Run `f` against the locked store on the blocking pool (DB calls never run on +/// an async worker, tech-spec §3). +async fn with_store(store: &SharedStore, f: F) -> Result +where + F: FnOnce(&mut LocalStore) -> heph_core::Result + Send + 'static, + T: Send + 'static, +{ + let store = store.clone(); + let out = tokio::task::spawn_blocking(move || { + let mut guard = store.lock().expect("store mutex poisoned"); + f(&mut guard) + }) + .await?; + Ok(out?) +} + +/// Apply a batch of ops in HLC order, returning how many were newly applied and +/// the highest HLC seen (the new cursor position). +fn apply_batch( + store: &mut LocalStore, + mut ops: Vec, +) -> heph_core::Result<(usize, Option)> { + ops.sort_by(|a, b| a.hlc.cmp(&b.hlc)); + let mut applied = 0; + let mut max_hlc = None; + for op in &ops { + if store.apply_op(op)? { + applied += 1; + } + max_hlc = Some(op.hlc.clone()); + } + Ok((applied, max_hlc)) +} + +/// The hub's HTTP router (server mode). Mount it on a TCP listener. +pub fn router(store: SharedStore) -> Router { + Router::new() + .route("/sync/pull", get(pull)) + .route("/sync/push", post(push)) + .with_state(store) +} + +#[derive(Debug, Deserialize)] +struct PullQuery { + /// HLC cursor — return ops strictly newer than this (absent ⇒ from the start). + #[serde(default)] + after: Option, +} + +/// `GET /sync/pull?after=` — ops past the caller's cursor, HLC order. +async fn pull( + State(store): State, + Query(q): Query, +) -> Result, StatusCode> { + let ops = with_store(&store, move |s| s.ops_since(q.after.as_deref())) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(Json(OpsBody { ops })) +} + +/// `POST /sync/push` — merge the caller's ops; reply with how many newly applied. +async fn push( + State(store): State, + Json(body): Json, +) -> Result, StatusCode> { + let (applied, _max) = with_store(&store, move |s| apply_batch(s, body.ops)) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(Json(SyncReport { + applied, + ..Default::default() + })) +} + +/// Exchange ops with `hub_url` once: pull new ops and merge them, then push our +/// new ops. Advances the per-hub cursors so the next call transfers only the +/// tail. `http` is a shared [`reqwest::Client`]. +pub async fn sync_once( + store: SharedStore, + hub_url: &str, + http: &reqwest::Client, +) -> Result { + let base = hub_url.trim_end_matches('/'); + let mut report = SyncReport::default(); + + let cursors = { + let hub = hub_url.to_string(); + with_store(&store, move |s| s.sync_state(&hub)).await? + }; + + // --- pull then merge --- + let mut req = http.get(format!("{base}/sync/pull")); + if let Some(after) = &cursors.last_pulled_hlc { + req = req.query(&[("after", after)]); + } + let pulled: OpsBody = req.send().await?.error_for_status()?.json().await?; + report.pulled = pulled.ops.len(); + if !pulled.ops.is_empty() { + let (applied, max_pulled) = with_store(&store, move |s| apply_batch(s, pulled.ops)).await?; + report.applied = applied; + if let Some(cursor) = max_pulled { + let hub = hub_url.to_string(); + with_store(&store, move |s| s.record_sync(&hub, None, Some(&cursor))).await?; + } + } + + // --- push our tail --- + let to_push = { + let after = cursors.last_pushed_hlc.clone(); + with_store(&store, move |s| s.ops_since(after.as_deref())).await? + }; + report.pushed = to_push.len(); + if !to_push.is_empty() { + // `ops_since` returns HLC order, so the last is the new cursor. + let max_pushed = to_push.last().map(|o| o.hlc.clone()); + http.post(format!("{base}/sync/push")) + .json(&OpsBody { ops: to_push }) + .send() + .await? + .error_for_status()?; + if let Some(cursor) = max_pushed { + let hub = hub_url.to_string(); + with_store(&store, move |s| s.record_sync(&hub, Some(&cursor), None)).await?; + } + } + + Ok(report) +} diff --git a/crates/hephd/tests/sync_http.rs b/crates/hephd/tests/sync_http.rs new file mode 100644 index 0000000..de4194e --- /dev/null +++ b/crates/hephd/tests/sync_http.rs @@ -0,0 +1,135 @@ +//! Network sync over real HTTP (tech-spec §6.1, §12, slice 9a). A hub (the +//! `sync::router`) runs on an ephemeral TCP port; two spoke replicas exchange +//! ops with it via `sync_once` and converge — exactly the offline-first +//! everyday config (`local` + `hub_url`). The merge logic is `heph-core`'s, +//! proven in its own convergence tests; here we prove the transport carries it. + +use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::{Arc, Mutex}; + +use heph_core::{Attention, Clock, LocalStore, NewNode, NewTask, Store, TaskState}; +use hephd::sync; + +/// Every replica + the hub adopt this one canonical owner (tech-spec §13). +const OWNER: &str = "canonical-user"; + +#[derive(Clone)] +struct StepClock(Arc); +impl StepClock { + fn new(ms: i64) -> Self { + StepClock(Arc::new(AtomicI64::new(ms))) + } + fn set(&self, ms: i64) { + self.0.store(ms, Ordering::SeqCst); + } +} +impl Clock for StepClock { + fn now_ms(&self) -> i64 { + self.0.load(Ordering::SeqCst) + } +} + +type Shared = Arc>; + +/// A replica backed by a temp SQLite file, sharing the canonical owner. The +/// `TempDir` is returned so the caller keeps the file alive. +fn replica(now: i64) -> (Shared, StepClock, tempfile::TempDir) { + let dir = tempfile::tempdir().unwrap(); + let clock = StepClock::new(now); + let mut store = LocalStore::open(dir.path().join("heph.db"), Box::new(clock.clone())).unwrap(); + store.adopt_owner(OWNER).unwrap(); + (Arc::new(Mutex::new(store)), clock, dir) +} + +/// Start the hub router on an ephemeral port; return its base URL. The serve +/// task and the hub's `TempDir` are leaked for the test's lifetime. +async fn start_hub() -> String { + let (hub, _clock, dir) = replica(1000); + Box::leak(Box::new(dir)); // keep the hub DB file alive + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let app = sync::router(hub); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + format!("http://{addr}") +} + +#[tokio::test] +async fn a_node_propagates_a_to_hub_to_b() { + let hub_url = start_hub().await; + let http = reqwest::Client::new(); + let (a, _ca, _da) = replica(1000); + let (b, _cb, _db) = replica(1000); + + let id = { + let mut ga = a.lock().unwrap(); + ga.create_node(NewNode::doc("Roof", "shingles need work")) + .unwrap() + .id + }; + + // A pushes to the hub; B pulls from it. + let up = sync::sync_once(a.clone(), &hub_url, &http).await.unwrap(); + assert!(up.pushed > 0, "A pushed nothing"); + let down = sync::sync_once(b.clone(), &hub_url, &http).await.unwrap(); + assert!(down.applied > 0, "B applied nothing"); + + let on_b = b.lock().unwrap().get_node(&id).unwrap().expect("reached B"); + assert_eq!(on_b.title, "Roof"); + assert_eq!(on_b.body.as_deref(), Some("shingles need work")); +} + +#[tokio::test] +async fn divergent_scalar_edits_converge_through_the_hub_with_a_conflict() { + let hub_url = start_hub().await; + let http = reqwest::Client::new(); + let (a, ca, _da) = replica(1000); + let (b, cb, _db) = replica(1000); + + // A creates a task and both replicas learn it through the hub. + let task_id = { + let mut ga = a.lock().unwrap(); + ga.create_task(NewTask { + title: "Renew passport".into(), + attention: Some(Attention::Orange), + ..Default::default() + }) + .unwrap() + .node_id + }; + sync::sync_once(a.clone(), &hub_url, &http).await.unwrap(); + sync::sync_once(b.clone(), &hub_url, &http).await.unwrap(); + + // Divergent offline edits on conflict-tracked fields; B's is later (higher + // HLC) so its whole scalar snapshot wins. + ca.set(2000); + a.lock() + .unwrap() + .set_task_state(&task_id, TaskState::Done) + .unwrap(); + cb.set(3000); + b.lock() + .unwrap() + .set_task_attention(&task_id, Attention::Red) + .unwrap(); + + // A few exchanges in each direction settle it. + for _ in 0..2 { + sync::sync_once(a.clone(), &hub_url, &http).await.unwrap(); + sync::sync_once(b.clone(), &hub_url, &http).await.unwrap(); + } + + let ta = a.lock().unwrap().get_task(&task_id).unwrap().unwrap(); + let tb = b.lock().unwrap().get_task(&task_id).unwrap().unwrap(); + assert_eq!(ta, tb, "replicas did not converge: {ta:?} vs {tb:?}"); + assert_eq!(ta.attention, Some(Attention::Red), "later HLC should win"); + assert!( + !a.lock().unwrap().conflicts_list().unwrap().is_empty(), + "A recorded no conflict" + ); + assert!( + !b.lock().unwrap().conflicts_list().unwrap().is_empty(), + "B recorded no conflict" + ); +} diff --git a/docs/changelog.d/v1-prototype.feature.md b/docs/changelog.d/v1-prototype.feature.md index 2ecef02..92cbacb 100644 --- a/docs/changelog.d/v1-prototype.feature.md +++ b/docs/changelog.d/v1-prototype.feature.md @@ -9,4 +9,5 @@ Begin the v1 prototype (Phase 1, tech-spec §11.1), built in TDD slices: - `heph` CLI (§1) — a thin client of the daemon: `next`, `task`, `doc`, `get`, `export`. Export materializes the store to a `/.md` tree with YAML frontmatter + body (§5), one-way, tombstones excluded. - Sync engine, local-only (§12): real hybrid logical clock + persistent device `origin`; an append-only op-log per mutation; an idempotent, order-independent merge/apply engine — last-writer-wins task scalars (discards surfaced in a `conflicts` queue), OR-set links, monotonic tombstones. Two-replica convergence proven. - Body text CRDT (§5, §12, slice 8d): node bodies now merge through the `yrs` text CRDT (`body_crdt`) instead of last-writer-wins — whole-buffer writes are diffed into the doc and the yrs delta rides the op, so concurrent edits to different regions both survive and never enqueue a conflict. +- Network sync over HTTP (§6.1, §12, slice 9a): `hephd --mode server` exposes a sync hub (`POST /sync/push`, `GET /sync/pull?after=`, axum) over the same store; `hephd --mode local --hub-url ` becomes a spoke that background-syncs its op-log with that hub (and on demand via the `sync.now`/`sync.status` RPC). Exchange is incremental by HLC cursor (`sync_state`) and idempotent. The merge engine is `heph-core`'s, unchanged. Unauthenticated/single-owner for now (auth lands with OIDC). `conflicts.list`/`conflicts.resolve` are now reachable over the daemon socket. - CI runs the Rust suite (fmt/clippy/test) via the project build hook. diff --git a/docs/reference/tech-spec.md b/docs/reference/tech-spec.md index f696f70..7c6eaef 100644 --- a/docs/reference/tech-spec.md +++ b/docs/reference/tech-spec.md @@ -326,7 +326,7 @@ See [[design]] §5–§7 for the constraints later phases impose on present choi ## 14. Implementation status (Phase 1 tracker) -> Cross-session resume tracker for the Phase 1 C1 (branch `feature/v1-prototype`, PR #1). Updated 2026-06-01 — **97 tests green** (`cargo test --all`), `clippy -D warnings` + `fmt` + `prek` clean. Workspace: `crates/heph-core`, `crates/hephd`, `crates/heph` (no `heph.nvim/` yet). +> Cross-session resume tracker for the Phase 1 C1 (branch `feature/v1-prototype`, PR #1). Updated 2026-06-01 — **100 tests green** (`cargo test --all`), `clippy -D warnings` + `fmt` + `prek` clean. Workspace: `crates/heph-core`, `crates/hephd`, `crates/heph` (no `heph.nvim/` yet). **Done** @@ -334,16 +334,17 @@ See [[design]] §5–§7 for the constraints later phases impose on present choi - ✅ **Markdown handling (§5):** wiki-link + checkbox extraction (pure, idempotent, code-aware); `update_node` materializes/reconciles `wiki` links; `export` to a `/.md` tree. - ✅ **Recurrence (§4.4):** roll-forward in place — fresh checklist, logged occurrence, advance-skipping-misses; completion never carries forward (proptest). Per-task logs; `skip`. - ✅ **Ranking (§7):** pure two-stage filter + reorderable named dimensions; proptest total order. -- ✅ **Daemon RPC (§6) — local subset:** node.get/create/update/tombstone, task.create/set_state/set_attention/skip, next, list, health, journal.open_or_create, search, links.outgoing/backlinks, log.append/tail, export, conflicts.list/resolve, sync (ops_since/apply_op). Line-delimited JSON-RPC over a unix socket; sync `Client`. -- ✅ **Runtime modes (§3.1) — `local` only:** exclusive file-lock handoff via `LockGuard`. +- ✅ **Daemon RPC (§6):** node.get/create/update/tombstone, task.create/set_state/set_attention/skip, next, list, health, journal.open_or_create, search, links.outgoing/backlinks, log.append/tail, export, conflicts.list/resolve, sync.now/sync.status. Line-delimited JSON-RPC over a unix socket; sync `Client`. (`ops_since`/`apply_op` are `Store` methods exchanged over the hub HTTP endpoint, not the unix socket.) +- ✅ **Runtime modes (§3.1) — `local` + `server`:** exclusive file-lock handoff via `LockGuard`; `--mode local|server`, `--hub-url`, `--http-addr`. `client` (no replica) is a later slice. - ✅ **Sync engine (§12) minus network:** HLC (clock-injected, monotonic) + persistent device `origin`; op-log per mutation; `apply_op` merge — **LWW** task scalars + titles with a **conflict queue**, **OR-set** links, monotonic tombstones, idempotent; two-replica convergence proven. `adopt_owner` = basic §13 canonical-owner adoption. - ✅ **Body text CRDT (§5, §12, slice 8d):** node bodies merge through the **`yrs`** text CRDT (`body_crdt` BLOB) instead of LWW. A device authors under a stable `client_id` derived from its `origin`; whole-buffer writes are diffed (common prefix/suffix, char-boundary safe) into the doc; the yrs delta rides the `node.create`/`node.set` op (`body_crdt` field) and `apply` merges it — concurrent disjoint edits both survive and never enqueue a conflict. +- ✅ **Network sync (§6.1, §12, slice 9a):** **transport ratified = `axum` HTTP/JSON.** The hub (`server` mode) exposes `POST /sync/push` + `GET /sync/pull?after=` over the same store; a spoke (`local` + `hub_url`) runs `sync::sync_once` (pull→merge, then push) and background-syncs on a 30s interval. Incremental by HLC cursor (`sync_state`/`SyncCursors`); idempotent re-push is a no-op. Two spokes converge through a real-HTTP hub (incl. scalar conflict) in `tests/sync_http.rs`. **Unauthenticated for now, single-owner** (auth + per-user scoping is slice 10). - ✅ **CLI (§1):** `heph` next/task/doc/get/export/search/journal. - ✅ **CI (§9):** `.forgejo/scripts/build` runs fmt/clippy/test (self-bootstrapping rustup). **Not yet done (resume order)** -1. ⏳ **`server`/`client` modes + network sync (§3.1, §6.1, §12):** `RemoteStore`; hub network endpoint; push/pull by HLC cursor (the merge logic already exists); background sync; `sync.now`/`sync.status` RPC; multi-replica-over-real-sockets tests. Open: transport (`axum` HTTP/JSON vs gRPC), propagation cadence, device-id/hub registration. +1. ⏳ **`client` mode + `RemoteStore` (§3.1, slice 9b):** a no-replica spoke that proxies every `Store` method to a `server` over HTTP — the online-only escape hatch (borrowed box, CI, future web backend). Open: how the proxy maps the mutating `Store` surface; `sync.now` is N/A for `client`. 2. ⏳ **OIDC/Authentik auth (§13):** device-code flow, bearer token on the hub endpoint, full per-user isolation, adoption-with-deterministic-ids. 3. ⏳ **`heph.nvim` (§8):** obsidian.nvim parity + task views; headless-nvim e2e (needs `neovim` + `plenary.nvim` on the CI runner).