From ed8c7a733a01f2a5331854d65baaa49b0ee0da5b Mon Sep 17 00:00:00 2001 From: Erich Blume Date: Sun, 31 May 2026 20:28:15 -0700 Subject: [PATCH] hephd local mode: file lock + JSON-RPC over unix socket MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Slice 6 (tech-spec §3, §6, §10). First async component — the per-device daemon in local mode. - `LockGuard`: exclusive advisory flock on a sidecar `.lock`; a second acquire fails and releases on drop (the §3.1 lock handoff). - JSON-RPC (line-delimited): `rpc::dispatch` maps node/task/next/links/log methods onto the heph-core Store; `Daemon::serve` accepts unix-socket connections and runs dispatch on tokio's blocking pool behind an Arc> (DB never touches an async worker). - Synchronous `Client` for surfaces/CLI; `hephd` binary (clap) opens the store under lock and serves the default socket. - heph-core model/ranking types are now serde-(de)serializable; added node.tombstone + Store::tombstone_node. Tests: 2 lock unit tests + 5 real-socket e2e (round-trip with clock injection, next, error paths, recurring roll-forward over RPC, 8-client concurrency). 60 tests green. Co-Authored-By: Claude Opus 4.8 (1M context) --- AGENTS.md | 2 +- Cargo.lock | 488 ++++++++++++++++++++++- Cargo.toml | 16 +- crates/heph-core/Cargo.toml | 1 + crates/heph-core/src/model.rs | 26 +- crates/heph-core/src/ranking.rs | 4 +- crates/heph-core/src/sqlite/mod.rs | 5 + crates/heph-core/src/sqlite/nodes.rs | 13 + crates/heph-core/src/store.rs | 3 + crates/hephd/Cargo.toml | 32 ++ crates/hephd/src/client.rs | 61 +++ crates/hephd/src/clock.rs | 20 + crates/hephd/src/lib.rs | 40 ++ crates/hephd/src/lock.rs | 91 +++++ crates/hephd/src/main.rs | 61 +++ crates/hephd/src/rpc.rs | 227 +++++++++++ crates/hephd/src/server.rs | 101 +++++ crates/hephd/tests/rpc_socket.rs | 215 ++++++++++ docs/changelog.d/v1-prototype.feature.md | 1 + 19 files changed, 1390 insertions(+), 17 deletions(-) create mode 100644 crates/hephd/Cargo.toml create mode 100644 crates/hephd/src/client.rs create mode 100644 crates/hephd/src/clock.rs create mode 100644 crates/hephd/src/lib.rs create mode 100644 crates/hephd/src/lock.rs create mode 100644 crates/hephd/src/main.rs create mode 100644 crates/hephd/src/rpc.rs create mode 100644 crates/hephd/src/server.rs create mode 100644 crates/hephd/tests/rpc_socket.rs diff --git a/AGENTS.md b/AGENTS.md index 53ec884..01016dd 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -47,7 +47,7 @@ A Cargo workspace (`Cargo.toml` at the root) plus the Neovim plugin and repo too ./Cargo.toml # workspace manifest (shared deps + members) ./crates/heph-core/ # core lib: data model, Store trait + SQLite store, extraction, # recurrence, "what is next?" ranking, op-log/HLC/CRDT sync -./crates/hephd/ # daemon (planned): local/server/client modes; JSON-RPC over unix socket +./crates/hephd/ # daemon: local mode done (JSON-RPC over unix socket + file lock); server/client modes planned ./crates/heph/ # CLI (planned): export, scripting, `heph conflicts` ./heph.nvim/ # Neovim plugin (planned): primary surface; replaces obsidian.nvim ./docs/ # Diataxis docs (incl. [[design]] + [[tech-spec]]), Quartz config, release content diff --git a/Cargo.lock b/Cargo.lock index 5772df3..20bf208 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -32,6 +32,62 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" + +[[package]] +name = "anstyle-parse" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] + +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + [[package]] name = "autocfg" version = "1.5.1" @@ -65,6 +121,12 @@ version = "3.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72f5acc6cb2ba439de613abc23857ec3d78374d8ed5ac84e9d11336e87da8649" +[[package]] +name = "bytes" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" + [[package]] name = "cc" version = "1.2.63" @@ -116,6 +178,52 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "clap" +version = "4.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2ce8604710f6733aa641a2b3731eaa1e8b3d9973d5e3565da11800813f997a9" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" + +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -129,7 +237,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -162,6 +270,16 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "fs4" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c29c30684418547d476f0b48e84f4821639119c483b1eccd566c8cd0cd05f521" +dependencies = [ + "rustix 0.38.44", + "windows-sys 0.52.0", +] + [[package]] name = "futures-core" version = "0.3.32" @@ -216,6 +334,12 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "heph-core" version = "0.0.0" @@ -225,10 +349,28 @@ dependencies = [ "pulldown-cmark", "rrule", "rusqlite", + "serde", "thiserror 2.0.18", "ulid", ] +[[package]] +name = "hephd" +version = "0.0.0" +dependencies = [ + "anyhow", + "clap", + "fs4", + "heph-core", + "serde", + "serde_json", + "tempfile", + "thiserror 2.0.18", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "iana-time-zone" version = "0.1.65" @@ -253,6 +395,18 @@ dependencies = [ "cc", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + [[package]] name = "js-sys" version = "0.3.99" @@ -288,6 +442,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" + [[package]] name = "linux-raw-sys" version = "0.12.1" @@ -300,12 +460,41 @@ version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "616ec5685824bcc94416c6d4a7a446eea774a31efd7062c8480ba6fd06d7a6e5" +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "memchr" version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b947ae49db0d222b1dbc6b113ce7248a3fc3a6ca21b696717bfc000ba4484d8" +[[package]] +name = "mio" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02bd0af71c67b473010cbbc60715ee815645a4dc942899111f494b4b737d6fda" +dependencies = [ + "libc", + "wasi", + "windows-sys 0.61.2", +] + +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -321,6 +510,12 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "parse-zoneinfo" version = "0.3.1" @@ -559,6 +754,19 @@ dependencies = [ "smallvec", ] +[[package]] +name = "rustix" +version = "0.38.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys 0.4.15", + "windows-sys 0.59.0", +] + [[package]] name = "rustix" version = "1.1.4" @@ -568,8 +776,8 @@ dependencies = [ "bitflags", "errno", "libc", - "linux-raw-sys", - "windows-sys", + "linux-raw-sys 0.12.1", + "windows-sys 0.61.2", ] [[package]] @@ -590,6 +798,58 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.150" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "2.0.1" @@ -614,6 +874,22 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "socket2" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52d1cfed4120b4d927bf7c0f86d2087a4a7d6027c906d9f9d525a80573b9be51" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "syn" version = "2.0.117" @@ -634,8 +910,8 @@ dependencies = [ "fastrand", "getrandom", "once_cell", - "rustix", - "windows-sys", + "rustix 1.1.4", + "windows-sys 0.61.2", ] [[package]] @@ -678,6 +954,102 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "tokio" +version = "1.52.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe" +dependencies = [ + "bytes", + "libc", + "mio", + "pin-project-lite", + "socket2", + "tokio-macros", + "windows-sys 0.61.2", +] + +[[package]] +name = "tokio-macros" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + [[package]] name = "ulid" version = "1.2.1" @@ -706,6 +1078,18 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" @@ -727,6 +1111,12 @@ dependencies = [ "libc", ] +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + [[package]] name = "wasip2" version = "1.0.3+wasi-0.2.9" @@ -850,6 +1240,24 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.61.2" @@ -859,6 +1267,70 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + [[package]] name = "wit-bindgen" version = "0.57.1" @@ -884,3 +1356,9 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml index 4cb4bd6..8e20550 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = ["crates/heph-core"] +members = ["crates/heph-core", "crates/hephd"] [workspace.package] edition = "2021" @@ -18,6 +18,20 @@ anyhow = "1" pulldown-cmark = { version = "0.13", default-features = false } rrule = "0.13" chrono = { version = "0.4", default-features = false, features = ["clock"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tokio = { version = "1", features = [ + "rt-multi-thread", + "net", + "io-util", + "macros", + "sync", + "time", +] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +clap = { version = "4", features = ["derive"] } +fs4 = "0.12" [profile.release] lto = "thin" diff --git a/crates/heph-core/Cargo.toml b/crates/heph-core/Cargo.toml index 9633ecb..3cb94c7 100644 --- a/crates/heph-core/Cargo.toml +++ b/crates/heph-core/Cargo.toml @@ -15,6 +15,7 @@ thiserror.workspace = true pulldown-cmark.workspace = true rrule.workspace = true chrono.workspace = true +serde.workspace = true [dev-dependencies] proptest = "1" diff --git a/crates/heph-core/src/model.rs b/crates/heph-core/src/model.rs index 1525396..28f8054 100644 --- a/crates/heph-core/src/model.rs +++ b/crates/heph-core/src/model.rs @@ -3,10 +3,13 @@ //! Every first-class entity is a [`Node`]. Tasks, links, recurrence, and the //! derived context-item index build on top of this base in later slices. +use serde::{Deserialize, Serialize}; + use crate::error::{Error, Result}; /// Discriminator for the kind of thing a node is (tech-spec §4.1). -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] pub enum NodeKind { /// Rich context document (knowledge base, work-logs). Body = markdown. Doc, @@ -46,7 +49,7 @@ impl NodeKind { } /// A persisted node (a row of the `nodes` table). -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Node { /// Stable, sync-safe id (ULID for content nodes; deterministic for journal/tag). pub id: String, @@ -69,7 +72,8 @@ pub struct Node { } /// A task's attention-state — the lived colour discipline ([[design]] §6.2). -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] pub enum Attention { /// Default — actionable once the do-date arrives. White, @@ -106,7 +110,8 @@ impl Attention { /// A committed task's lifecycle state (tech-spec §4.3). `done` and `dropped` /// are both "not outstanding"; the distinction is retained for honesty/history. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] pub enum TaskState { /// Still to be done. Outstanding, @@ -138,7 +143,8 @@ impl TaskState { } /// A typed, directional edge between two nodes (tech-spec §4.2). -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] pub enum LinkType { /// Materialized from a `[[link]]` in a body. Wiki, @@ -190,7 +196,7 @@ impl LinkType { } /// A persisted link (a row of the `links` table). -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Link { /// ULID id. pub id: String, @@ -207,7 +213,7 @@ pub struct Link { } /// A persisted committed task (a `tasks` row joined to its node id). -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Task { /// The id of the backing `task` node. pub node_id: String, @@ -225,7 +231,8 @@ pub struct Task { /// Input for creating a committed task. The canonical context `doc` and the /// `canonical-context` link are created automatically (tech-spec §6). -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(default)] pub struct NewTask { /// Title (shared by the task node and its canonical context doc). pub title: String, @@ -242,13 +249,14 @@ pub struct NewTask { } /// Input for creating a node. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct NewNode { /// What kind of node to create. pub kind: NodeKind, /// Human-facing title. pub title: String, /// Optional markdown body. + #[serde(default)] pub body: Option, } diff --git a/crates/heph-core/src/ranking.rs b/crates/heph-core/src/ranking.rs index bb16120..e586829 100644 --- a/crates/heph-core/src/ranking.rs +++ b/crates/heph-core/src/ranking.rs @@ -14,11 +14,13 @@ use std::cmp::Ordering; +use serde::{Deserialize, Serialize}; + use crate::model::{Attention, TaskState}; /// A task as seen by the ranking engine — the candidacy fields plus the bits /// the Tactical output row shows. Used as both input and output of [`rank`]. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RankedTask { /// The task node id. pub node_id: String, diff --git a/crates/heph-core/src/sqlite/mod.rs b/crates/heph-core/src/sqlite/mod.rs index c91ad84..323304b 100644 --- a/crates/heph-core/src/sqlite/mod.rs +++ b/crates/heph-core/src/sqlite/mod.rs @@ -119,6 +119,11 @@ impl Store for LocalStore { nodes::update(&mut self.conn, &self.owner_id, now, id, title, body) } + fn tombstone_node(&mut self, id: &str) -> Result<()> { + let now = self.clock.now_ms(); + nodes::tombstone(&self.conn, now, id) + } + fn create_task(&mut self, input: NewTask) -> Result { let now = self.clock.now_ms(); tasks::create(&mut self.conn, &self.owner_id, now, input) diff --git a/crates/heph-core/src/sqlite/nodes.rs b/crates/heph-core/src/sqlite/nodes.rs index a955749..1f68eac 100644 --- a/crates/heph-core/src/sqlite/nodes.rs +++ b/crates/heph-core/src/sqlite/nodes.rs @@ -136,6 +136,19 @@ pub(super) fn update( Ok(node) } +/// Tombstone (soft-delete) a node. No hard deletes — tombstones keep merge +/// monotonic (tech-spec §4.3). +pub(super) fn tombstone(conn: &Connection, now: i64, id: &str) -> Result<()> { + let updated = conn.execute( + "UPDATE nodes SET tombstoned = 1, modified_at = ?1, hlc = ?2 WHERE id = ?3", + (now, hlc_for(now), id), + )?; + if updated == 0 { + return Err(Error::NodeNotFound(id.to_string())); + } + Ok(()) +} + /// Bump `modified_at`/`hlc` on a node (used when a task scalar field changes so /// the node's modified time reflects the mutation for sync ordering). pub(super) fn touch(conn: &Connection, now: i64, id: &str) -> Result<()> { diff --git a/crates/heph-core/src/store.rs b/crates/heph-core/src/store.rs index 854d39e..260c60a 100644 --- a/crates/heph-core/src/store.rs +++ b/crates/heph-core/src/store.rs @@ -33,6 +33,9 @@ pub trait Store { body: Option, ) -> Result; + /// Tombstone (soft-delete) a node. No hard deletes (tech-spec §4.3). + fn tombstone_node(&mut self, id: &str) -> Result<()>; + // --- tasks --- /// Create a committed task, auto-creating its canonical context `doc` and diff --git a/crates/hephd/Cargo.toml b/crates/hephd/Cargo.toml new file mode 100644 index 0000000..d9f751c --- /dev/null +++ b/crates/hephd/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "hephd" +description = "Hephaestus per-device daemon: owns the local store and serves surfaces over a unix socket." +edition.workspace = true +version.workspace = true +license.workspace = true +publish.workspace = true +authors.workspace = true +rust-version.workspace = true + +[lib] +name = "hephd" +path = "src/lib.rs" + +[[bin]] +name = "hephd" +path = "src/main.rs" + +[dependencies] +heph-core = { path = "../heph-core" } +tokio.workspace = true +serde.workspace = true +serde_json.workspace = true +anyhow.workspace = true +thiserror.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +clap.workspace = true +fs4.workspace = true + +[dev-dependencies] +tempfile = "3" diff --git a/crates/hephd/src/client.rs b/crates/hephd/src/client.rs new file mode 100644 index 0000000..c3c008b --- /dev/null +++ b/crates/hephd/src/client.rs @@ -0,0 +1,61 @@ +//! A minimal **synchronous** JSON-RPC client over the unix socket. +//! +//! Used by the `heph` CLI and by tests. Surfaces never touch SQLite directly +//! (tech-spec §3) — they go through the daemon socket, which this wraps. + +use std::io::{BufRead, BufReader, Write}; +use std::os::unix::net::UnixStream; +use std::path::Path; + +use anyhow::{bail, Context, Result}; +use serde_json::{json, Value}; + +use crate::rpc::Response; + +/// A connected client. One request/response per [`call`](Client::call). +pub struct Client { + reader: BufReader, + writer: UnixStream, + next_id: u64, +} + +impl Client { + /// Connect to a daemon listening at `socket_path`. + pub fn connect(socket_path: &Path) -> Result { + let stream = UnixStream::connect(socket_path) + .with_context(|| format!("connecting to hephd at {}", socket_path.display()))?; + let reader = BufReader::new(stream.try_clone()?); + Ok(Client { + reader, + writer: stream, + next_id: 1, + }) + } + + /// Call `method` with `params`, returning the `result` value (or an error + /// carrying the RPC error's code and message). + pub fn call(&mut self, method: &str, params: Value) -> Result { + let id = self.next_id; + self.next_id += 1; + + let mut line = serde_json::to_string(&json!({ + "id": id, + "method": method, + "params": params, + }))?; + line.push('\n'); + self.writer.write_all(line.as_bytes())?; + self.writer.flush()?; + + let mut response_line = String::new(); + let read = self.reader.read_line(&mut response_line)?; + if read == 0 { + bail!("hephd closed the connection"); + } + let response: Response = serde_json::from_str(&response_line)?; + if let Some(err) = response.error { + bail!("rpc error {}: {}", err.code, err.message); + } + Ok(response.result.unwrap_or(Value::Null)) + } +} diff --git a/crates/hephd/src/clock.rs b/crates/hephd/src/clock.rs new file mode 100644 index 0000000..602215c --- /dev/null +++ b/crates/hephd/src/clock.rs @@ -0,0 +1,20 @@ +//! The real system clock. +//! +//! `heph-core` never reads the ambient wall clock (tech-spec §2) — the daemon +//! injects it here. This is the one place a real `SystemTime::now()` lives. + +use std::time::{SystemTime, UNIX_EPOCH}; + +use heph_core::Clock; + +/// A [`Clock`] backed by the OS wall clock. +pub struct SystemClock; + +impl Clock for SystemClock { + fn now_ms(&self) -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0) + } +} diff --git a/crates/hephd/src/lib.rs b/crates/hephd/src/lib.rs new file mode 100644 index 0000000..f54e0f6 --- /dev/null +++ b/crates/hephd/src/lib.rs @@ -0,0 +1,40 @@ +//! `hephd` — the Hephaestus per-device daemon. +//! +//! One binary, three modes (`local`/`server`/`client`); **slice 6 implements +//! `local`**. It owns the local SQLite handle (via [`heph_core::LocalStore`]), +//! takes the file's exclusive [lock](lock::LockGuard), and serves surfaces a +//! line-delimited JSON-RPC API over a unix socket ([`server::Daemon`]). The +//! query/mutation logic all lives in `heph-core`; this crate is transport, +//! locking, and (later) sync/auth. + +pub mod client; +pub mod clock; +pub mod lock; +pub mod rpc; +pub mod server; + +use std::path::PathBuf; + +pub use client::Client; +pub use clock::SystemClock; +pub use lock::LockGuard; +pub use server::Daemon; + +/// Default unix socket path: `$XDG_RUNTIME_DIR/heph/hephd.sock`, falling back to +/// the system temp dir when `XDG_RUNTIME_DIR` is unset (tech-spec §3). +pub fn default_socket_path() -> PathBuf { + let base = std::env::var_os("XDG_RUNTIME_DIR") + .map(PathBuf::from) + .unwrap_or_else(std::env::temp_dir); + base.join("heph").join("hephd.sock") +} + +/// Default store path: `$XDG_DATA_HOME/heph/heph.db`, falling back to +/// `$HOME/.local/share/heph/heph.db`. +pub fn default_db_path() -> PathBuf { + let base = std::env::var_os("XDG_DATA_HOME") + .map(PathBuf::from) + .or_else(|| std::env::var_os("HOME").map(|h| PathBuf::from(h).join(".local/share"))) + .unwrap_or_else(|| PathBuf::from(".")); + base.join("heph").join("heph.db") +} diff --git a/crates/hephd/src/lock.rs b/crates/hephd/src/lock.rs new file mode 100644 index 0000000..e087406 --- /dev/null +++ b/crates/hephd/src/lock.rs @@ -0,0 +1,91 @@ +//! Exclusive lock on a store's SQLite file (tech-spec §3.1). +//! +//! A `local` or `server` process takes the file's exclusive lock on open, so +//! **only one can own a given DB file at a time**. Kill the owner → the lock +//! releases → another process can open the same file (the "lock handoff"). A +//! `client` never opens the file, so it never contends. +//! +//! Implemented as an advisory `flock` on a sidecar `.lock` file (held for +//! the process lifetime). POSIX treats two opens of the same file in one +//! process independently, so a second [`acquire`](LockGuard::acquire) on the +//! same path fails just as a second process would. + +use std::fs::{File, OpenOptions}; +use std::path::{Path, PathBuf}; + +use anyhow::{Context, Result}; +use fs4::fs_std::FileExt; + +/// Holds the exclusive lock for as long as it lives; drops release it. +pub struct LockGuard { + _file: File, + path: PathBuf, +} + +impl LockGuard { + /// Acquire the exclusive lock for the store at `db_path`. Errors if another + /// `local`/`server` process already holds it. + pub fn acquire(db_path: &Path) -> Result { + let path = lock_path_for(db_path); + let file = OpenOptions::new() + .create(true) + .read(true) + .write(true) + .truncate(false) + .open(&path) + .with_context(|| format!("opening lock file {}", path.display()))?; + + // A failure here (typically `WouldBlock`) means another process holds + // the lock. + file.try_lock_exclusive().map_err(|e| { + anyhow::anyhow!( + "store {} is already locked by another hephd process: {e}", + db_path.display() + ) + })?; + Ok(LockGuard { _file: file, path }) + } + + /// The sidecar lock file path. + pub fn path(&self) -> &Path { + &self.path + } +} + +fn lock_path_for(db_path: &Path) -> PathBuf { + let mut name = db_path + .file_name() + .map(|n| n.to_os_string()) + .unwrap_or_default(); + name.push(".lock"); + db_path.with_file_name(name) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn second_acquire_on_same_path_fails_then_succeeds_after_release() { + let dir = tempfile::tempdir().unwrap(); + let db = dir.path().join("heph.db"); + + let first = LockGuard::acquire(&db).expect("first acquire"); + assert!( + LockGuard::acquire(&db).is_err(), + "second concurrent acquire must fail" + ); + + drop(first); + // Once released, a new acquire succeeds (the handoff). + let _again = LockGuard::acquire(&db).expect("acquire after release"); + } + + #[test] + fn lock_path_is_a_sidecar() { + assert_eq!( + lock_path_for(Path::new("/data/heph.db")), + PathBuf::from("/data/heph.db.lock") + ); + } +} diff --git a/crates/hephd/src/main.rs b/crates/hephd/src/main.rs new file mode 100644 index 0000000..5e0c1c1 --- /dev/null +++ b/crates/hephd/src/main.rs @@ -0,0 +1,61 @@ +//! `hephd` binary — starts the daemon in `local` mode (slice 6). + +use std::path::PathBuf; + +use anyhow::{Context, Result}; +use clap::Parser; +use tokio::net::UnixListener; + +use heph_core::LocalStore; +use hephd::{default_db_path, default_socket_path, Daemon, LockGuard, SystemClock}; + +/// The Hephaestus per-device daemon. +#[derive(Parser, Debug)] +#[command(name = "hephd", version, about)] +struct Cli { + /// Path to the SQLite store file. + #[arg(long)] + db: Option, + + /// Path to the unix socket to listen on. + #[arg(long)] + socket: Option, +} + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .init(); + + let cli = Cli::parse(); + let db = cli.db.unwrap_or_else(default_db_path); + let socket = cli.socket.unwrap_or_else(default_socket_path); + + if let Some(parent) = db.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("creating store dir {}", parent.display()))?; + } + if let Some(parent) = socket.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("creating socket dir {}", parent.display()))?; + } + + // Take the exclusive lock before opening the store (tech-spec §3.1). + let _lock = LockGuard::acquire(&db)?; + let store = LocalStore::open(&db, Box::new(SystemClock))?; + + // Replace any stale socket from a previous run, then bind. + if socket.exists() { + std::fs::remove_file(&socket) + .with_context(|| format!("removing stale socket {}", socket.display()))?; + } + let listener = UnixListener::bind(&socket) + .with_context(|| format!("binding socket {}", socket.display()))?; + + tracing::info!(db = %db.display(), socket = %socket.display(), "hephd local mode listening"); + Daemon::new(store).serve(listener).await +} diff --git a/crates/hephd/src/rpc.rs b/crates/hephd/src/rpc.rs new file mode 100644 index 0000000..ee87c7c --- /dev/null +++ b/crates/hephd/src/rpc.rs @@ -0,0 +1,227 @@ +//! JSON-RPC request/response types and the synchronous method dispatcher. +//! +//! The daemon speaks **line-delimited JSON-RPC** over a unix socket (tech-spec +//! §6, §10): one JSON object per line. [`dispatch`] is the pure, synchronous +//! heart — it maps a method name + params onto a [`heph_core::Store`] call and +//! is what the async transport runs on the blocking pool. The daemon is +//! **mode-agnostic**: Tactical/Strategic/Organizational are plugin-side +//! compositions of these primitives, not daemon concepts. + +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; + +use heph_core::{Attention, NewNode, NewTask, Store, TaskState}; + +/// A JSON-RPC request line. +#[derive(Debug, Deserialize)] +pub struct Request { + /// Correlation id, echoed in the response (any JSON value). + #[serde(default)] + pub id: Value, + /// The method name (e.g. `task.create`). + pub method: String, + /// Method parameters (an object); defaults to null when omitted. + #[serde(default)] + pub params: Value, +} + +/// A JSON-RPC response line — exactly one of `result`/`error` is present. +#[derive(Debug, Serialize, Deserialize)] +pub struct Response { + /// The request id this answers. + pub id: Value, + /// The successful result. + #[serde(skip_serializing_if = "Option::is_none")] + pub result: Option, + /// The error, if the call failed. + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +impl Response { + /// A success response. + pub fn ok(id: Value, result: Value) -> Response { + Response { + id, + result: Some(result), + error: None, + } + } + + /// An error response. + pub fn failed(id: Value, error: RpcError) -> Response { + Response { + id, + result: None, + error: Some(error), + } + } +} + +/// A JSON-RPC error object. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RpcError { + /// Machine-readable code (JSON-RPC conventions where applicable). + pub code: i64, + /// Human-readable message. + pub message: String, +} + +// Standard JSON-RPC codes plus a couple of app codes. +/// The request line was not valid JSON. +pub const PARSE_ERROR: i64 = -32700; +/// Params failed to deserialize for the method. +pub const INVALID_PARAMS: i64 = -32602; +/// No such method. +pub const METHOD_NOT_FOUND: i64 = -32601; +/// A store/internal failure. +pub const INTERNAL_ERROR: i64 = -32603; +/// A referenced node was not found. +pub const NOT_FOUND: i64 = -32004; + +impl RpcError { + fn new(code: i64, message: impl Into) -> RpcError { + RpcError { + code, + message: message.into(), + } + } +} + +impl From for RpcError { + fn from(e: heph_core::Error) -> RpcError { + match e { + heph_core::Error::NodeNotFound(_) => RpcError::new(NOT_FOUND, e.to_string()), + other => RpcError::new(INTERNAL_ERROR, other.to_string()), + } + } +} + +fn parse(params: Value) -> Result { + serde_json::from_value(params).map_err(|e| RpcError::new(INVALID_PARAMS, e.to_string())) +} + +#[derive(Deserialize)] +struct IdParam { + id: String, +} + +#[derive(Deserialize)] +struct UpdateParams { + id: String, + #[serde(default)] + title: Option, + #[serde(default)] + body: Option, +} + +#[derive(Deserialize)] +struct SetStateParams { + id: String, + state: TaskState, +} + +#[derive(Deserialize)] +struct SetAttentionParams { + id: String, + attention: Attention, +} + +#[derive(Deserialize)] +struct NextParams { + #[serde(default)] + scope: Option, + #[serde(default)] + limit: Option, +} + +#[derive(Deserialize)] +struct LinkParams { + id: String, +} + +#[derive(Deserialize)] +struct LogAppendParams { + task_id: String, + text: String, +} + +#[derive(Deserialize)] +struct LogTailParams { + task_id: String, + #[serde(default)] + n: Option, +} + +/// Default `next`/`list` result size (tech-spec §6). +const DEFAULT_LIMIT: usize = 5; +/// Default `log.tail` size. +const DEFAULT_TAIL: usize = 10; + +/// Dispatch one method call against `store`. Synchronous — the transport runs +/// this on a blocking pool. +pub fn dispatch(store: &mut dyn Store, method: &str, params: Value) -> Result { + Ok(match method { + "node.get" => { + let p: IdParam = parse(params)?; + json!(store.get_node(&p.id)?) + } + "node.create" => { + let p: NewNode = parse(params)?; + json!(store.create_node(p)?) + } + "node.update" => { + let p: UpdateParams = parse(params)?; + json!(store.update_node(&p.id, p.title, p.body)?) + } + "node.tombstone" => { + let p: IdParam = parse(params)?; + store.tombstone_node(&p.id)?; + json!({ "ok": true }) + } + "task.create" => { + let p: NewTask = parse(params)?; + json!(store.create_task(p)?) + } + "task.set_state" => { + let p: SetStateParams = parse(params)?; + json!(store.set_task_state(&p.id, p.state)?) + } + "task.set_attention" => { + let p: SetAttentionParams = parse(params)?; + json!(store.set_task_attention(&p.id, p.attention)?) + } + "task.skip" => { + let p: IdParam = parse(params)?; + json!(store.skip_recurrence(&p.id)?) + } + "next" => { + let p: NextParams = parse(params)?; + json!(store.next(p.scope.as_deref(), p.limit.unwrap_or(DEFAULT_LIMIT))?) + } + "links.outgoing" => { + let p: LinkParams = parse(params)?; + json!(store.outgoing_links(&p.id)?) + } + "links.backlinks" => { + let p: LinkParams = parse(params)?; + json!(store.backlinks(&p.id)?) + } + "log.append" => { + let p: LogAppendParams = parse(params)?; + store.log_append(&p.task_id, &p.text)?; + json!({ "ok": true }) + } + "log.tail" => { + let p: LogTailParams = parse(params)?; + json!(store.log_tail(&p.task_id, p.n.unwrap_or(DEFAULT_TAIL))?) + } + other => { + return Err(RpcError::new( + METHOD_NOT_FOUND, + format!("unknown method: {other}"), + )) + } + }) +} diff --git a/crates/hephd/src/server.rs b/crates/hephd/src/server.rs new file mode 100644 index 0000000..f4c81b8 --- /dev/null +++ b/crates/hephd/src/server.rs @@ -0,0 +1,101 @@ +//! The async daemon: accepts unix-socket connections and serves the JSON-RPC +//! API by running [`rpc::dispatch`] on tokio's blocking pool. +//! +//! `heph-core` is synchronous and its SQLite handle is single-writer, so the +//! store sits behind an `Arc>`; each request locks it inside a +//! `spawn_blocking` task (DB calls never run on an async worker, tech-spec §3). + +use std::sync::{Arc, Mutex}; + +use anyhow::Result; +use serde_json::Value; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::{UnixListener, UnixStream}; + +use heph_core::LocalStore; + +use crate::rpc::{self, Request, Response, RpcError, PARSE_ERROR}; + +/// A running daemon over a shared local store. +pub struct Daemon { + store: Arc>, +} + +impl Daemon { + /// Wrap an opened store. + pub fn new(store: LocalStore) -> Daemon { + Daemon { + store: Arc::new(Mutex::new(store)), + } + } + + /// Serve connections on `listener` until the task is cancelled. Each + /// connection is handled concurrently; all share the one store. + pub async fn serve(&self, listener: UnixListener) -> Result<()> { + loop { + let (stream, _addr) = listener.accept().await?; + let store = self.store.clone(); + tokio::spawn(async move { + if let Err(e) = handle_connection(stream, store).await { + tracing::debug!("connection closed: {e}"); + } + }); + } + } +} + +async fn handle_connection(stream: UnixStream, store: Arc>) -> Result<()> { + let (read_half, mut write_half) = stream.into_split(); + let mut lines = BufReader::new(read_half).lines(); + + while let Some(line) = lines.next_line().await? { + if line.trim().is_empty() { + continue; + } + let response = process_line(&line, &store).await; + let mut out = serde_json::to_string(&response)?; + out.push('\n'); + write_half.write_all(out.as_bytes()).await?; + write_half.flush().await?; + } + Ok(()) +} + +async fn process_line(line: &str, store: &Arc>) -> Response { + let request: Request = match serde_json::from_str(line) { + Ok(r) => r, + Err(e) => { + return Response::failed( + Value::Null, + RpcError { + code: PARSE_ERROR, + message: e.to_string(), + }, + ) + } + }; + + let id = request.id.clone(); + let store = store.clone(); + let method = request.method; + let params = request.params; + + // DB work runs on the blocking pool; the store mutex is held only there. + let dispatched = tokio::task::spawn_blocking(move || { + let mut guard = store.lock().expect("store mutex poisoned"); + rpc::dispatch(&mut *guard, &method, params) + }) + .await; + + match dispatched { + Ok(Ok(result)) => Response::ok(id, result), + Ok(Err(rpc_err)) => Response::failed(id, rpc_err), + Err(join_err) => Response::failed( + id, + RpcError { + code: rpc::INTERNAL_ERROR, + message: format!("dispatch task failed: {join_err}"), + }, + ), + } +} diff --git a/crates/hephd/tests/rpc_socket.rs b/crates/hephd/tests/rpc_socket.rs new file mode 100644 index 0000000..8951167 --- /dev/null +++ b/crates/hephd/tests/rpc_socket.rs @@ -0,0 +1,215 @@ +//! End-to-end daemon tests (tech-spec §9): a real `hephd` over a real unix +//! socket against a temp SQLite file, exercised by the sync client. Time is +//! clock-injected (FixedClock) so assertions are deterministic. + +use std::path::{Path, PathBuf}; +use std::thread; +use std::time::Duration; + +use serde_json::{json, Value}; +use tokio::net::UnixListener; + +use heph_core::{FixedClock, LocalStore}; +use hephd::{Client, Daemon}; + +const JAN1: i64 = 1_704_067_200_000; // 2024-01-01T00:00:00Z +const ONE_DAY: i64 = 86_400_000; +const NOW: i64 = JAN1 + ONE_DAY / 2; + +/// Start a daemon on its own thread+runtime against a temp DB and socket. +/// Returns the socket path; the returned `TempDir` keeps the files alive. +fn spawn_daemon() -> (PathBuf, tempfile::TempDir) { + let dir = tempfile::tempdir().unwrap(); + let db = dir.path().join("heph.db"); + let socket = dir.path().join("d.sock"); + + let store = LocalStore::open(&db, Box::new(FixedClock(NOW))).unwrap(); + let socket_for_thread = socket.clone(); + thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + let listener = UnixListener::bind(&socket_for_thread).unwrap(); + let _ = Daemon::new(store).serve(listener).await; + }); + }); + + // Wait for the socket to appear. + for _ in 0..200 { + if socket.exists() { + break; + } + thread::sleep(Duration::from_millis(5)); + } + (socket, dir) +} + +fn client(socket: &Path) -> Client { + Client::connect(socket).unwrap() +} + +#[test] +fn node_create_and_get_round_trip_over_socket() { + let (socket, _dir) = spawn_daemon(); + let mut c = client(&socket); + + let created = c + .call( + "node.create", + json!({ "kind": "doc", "title": "Roof log", "body": "# Roof" }), + ) + .unwrap(); + assert_eq!(created["kind"], "doc"); + assert_eq!(created["title"], "Roof log"); + // Clock injection: created_at is the daemon's FixedClock value. + assert_eq!(created["created_at"], NOW); + + let id = created["id"].as_str().unwrap(); + let fetched = c.call("node.get", json!({ "id": id })).unwrap(); + assert_eq!(fetched, created); + + // A missing node is JSON null, not an error. + let missing = c.call("node.get", json!({ "id": "nope" })).unwrap(); + assert_eq!(missing, Value::Null); +} + +#[test] +fn task_create_appears_in_next_with_context_link() { + let (socket, _dir) = spawn_daemon(); + let mut c = client(&socket); + + let task = c + .call( + "task.create", + json!({ "title": "Fix the roof leak", "attention": "red" }), + ) + .unwrap(); + let task_id = task["node_id"].as_str().unwrap().to_string(); + + let ranked = c.call("next", json!({ "limit": 5 })).unwrap(); + let arr = ranked.as_array().unwrap(); + assert_eq!(arr.len(), 1); + assert_eq!(arr[0]["node_id"], task_id); + assert_eq!(arr[0]["attention"], "red"); + assert!(arr[0]["canonical_context_id"].is_string()); + + // The canonical-context link is present and points at a doc. + let links = c.call("links.outgoing", json!({ "id": task_id })).unwrap(); + let ctx = links + .as_array() + .unwrap() + .iter() + .find(|l| l["link_type"] == "canonical-context") + .expect("canonical-context link"); + let doc = c.call("node.get", json!({ "id": ctx["dst_id"] })).unwrap(); + assert_eq!(doc["kind"], "doc"); +} + +#[test] +fn errors_are_reported_as_rpc_errors() { + let (socket, _dir) = spawn_daemon(); + let mut c = client(&socket); + + // Unknown method. + let err = c.call("does.not.exist", json!({})).unwrap_err(); + assert!(err.to_string().contains("unknown method"), "{err}"); + + // set_state on a non-existent task → NotFound error. + let err = c + .call( + "task.set_state", + json!({ "id": "missing", "state": "done" }), + ) + .unwrap_err(); + assert!(err.to_string().contains("not found"), "{err}"); + + // Bad params (missing a genuinely-required field) → invalid params. + // (`node.create` needs `kind`; `NewTask` defaults all fields, so an empty + // `task.create` is valid, not an error.) + let err = c + .call("node.create", json!({ "title": "no kind" })) + .unwrap_err(); + assert!(err.to_string().contains("missing field"), "{err}"); +} + +#[test] +fn recurring_task_rolls_forward_over_rpc() { + let (socket, _dir) = spawn_daemon(); + let mut c = client(&socket); + + let task = c + .call( + "task.create", + json!({ + "title": "Morning routine", + "do_date": JAN1, + "recurrence": "FREQ=DAILY", + }), + ) + .unwrap(); + let task_id = task["node_id"].as_str().unwrap().to_string(); + + // Find the canonical context doc and put a checked checklist in it. + let links = c.call("links.outgoing", json!({ "id": task_id })).unwrap(); + let doc_id = links + .as_array() + .unwrap() + .iter() + .find(|l| l["link_type"] == "canonical-context") + .unwrap()["dst_id"] + .as_str() + .unwrap() + .to_string(); + c.call( + "node.update", + json!({ "id": doc_id, "body": "- [x] brush teeth\n- [x] coffee\n" }), + ) + .unwrap(); + + // Complete the occurrence → rolls forward. + let rolled = c + .call("task.set_state", json!({ "id": task_id, "state": "done" })) + .unwrap(); + assert_eq!(rolled["state"], "outstanding"); + assert_eq!(rolled["do_date"], JAN1 + ONE_DAY); + + // Fresh checklist; completion did not carry forward. + let doc = c.call("node.get", json!({ "id": doc_id })).unwrap(); + assert_eq!(doc["body"], "- [ ] brush teeth\n- [ ] coffee\n"); + + // The completion is in the log. + let log = c + .call("log.tail", json!({ "task_id": task_id, "n": 10 })) + .unwrap(); + assert_eq!(log.as_array().unwrap().len(), 1); +} + +#[test] +fn multiple_clients_concurrently_create_tasks() { + let (socket, _dir) = spawn_daemon(); + const N: usize = 8; + + let handles: Vec<_> = (0..N) + .map(|i| { + let socket = socket.clone(); + thread::spawn(move || { + let mut c = Client::connect(&socket).unwrap(); + c.call( + "task.create", + json!({ "title": format!("task {i}"), "attention": "orange" }), + ) + .unwrap(); + }) + }) + .collect(); + for h in handles { + h.join().unwrap(); + } + + // A fresh client sees all N tasks ranked. + let mut c = client(&socket); + let ranked = c.call("next", json!({ "limit": 100 })).unwrap(); + assert_eq!(ranked.as_array().unwrap().len(), N); +} diff --git a/docs/changelog.d/v1-prototype.feature.md b/docs/changelog.d/v1-prototype.feature.md index 2ac879e..c0a213f 100644 --- a/docs/changelog.d/v1-prototype.feature.md +++ b/docs/changelog.d/v1-prototype.feature.md @@ -5,4 +5,5 @@ Begin the v1 prototype (Phase 1, tech-spec §11.1), built in TDD slices: - Committed tasks (§4.3, §6): `task.create` auto-creates the canonical context `doc` + `canonical-context` link; attention/do-date/late-on/state/recurrence columns; set-state/set-attention. Links CRUD (outgoing/backlinks). A body update reconciles `wiki` links (diff-based, resolved by alias/title, idempotent). - "What is next?" ranking (§7): pure, clock-injected, two-stage engine — candidacy filter (do-date as a boolean gate only) then a reorderable list of named dimensions (past-late-on → overdue-amount → attention band → FIFO). `late_on` is the sole urgency signal; blue hidden; red always shown. Proptest-checked total order. `Store::next` surfaces it over SQLite. - Recurrence — roll-forward in place (§4.4): completing a recurring task resets its checklist to all-unchecked, logs the occurrence, and advances the do-date to the next RRULE instance after now (skipping misses) — completion never carries forward (proptest-checked). Per-task append-only logs (`log-of`) with `log.append`/`log.tail`; `skip` advances without logging. +- `hephd` daemon, local mode (§3, §6): exclusive file lock (handoff-ready), line-delimited JSON-RPC over a unix socket exposing the node/task/next/links/log methods, with DB work on tokio's blocking pool. Synchronous client for surfaces/CLI. Model types are serde-serializable. - CI runs the Rust suite (fmt/clippy/test) via the project build hook.