hephaestus/heph.nvim/lua/heph/rpc.lua
Erich Blume e3db2ac550 heph.nvim: plug-and-play managed daemon (autostart, self-heal, client/server guardrail)
The plugin now manages its own hephd by default (autostart = true): if nothing
is serving the socket it spawns a local daemon against the default XDG paths,
kills only what it spawned on VimLeavePre, and self-heals — rpc.call retries
once through a respawn hook when the connection drops (the prior owner releases
the DB lock on exit, so a respawn can claim it).

- daemon.ensure() connects to an already-running daemon (any mode) or spawns one
  we own; stop_spawned()/is_managed() track lifecycle.
- A server/client daemon you started is always respected (spawn only when nothing
  serves the socket). autostart = false → connect-only, warns/errors if down,
  and clears the self-heal hook so it fails loudly.
- config: autostart defaults true; new `db` option; $HEPH_SOCKET / $HEPH_DB
  fallbacks isolate a dev Neovim onto a separate daemon + DB.

e2e: managed_daemon_spec covers autostart spawn, self-heal-after-kill, and
connect-only error. 10 specs green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-02 09:37:49 -07:00

227 lines
6.6 KiB
Lua

--- Line-delimited JSON-RPC client over hephd's unix socket (tech-spec §6).
---
--- The daemon speaks one JSON object per line: a request `{id, method, params}`
--- gets exactly one response line `{id, result}` xor `{id, error}`. We talk to
--- it over a libuv pipe and expose a **blocking** `call()` by pumping the event
--- loop with `vim.wait` until the matching id returns — synchronous ergonomics
--- over an async transport, which is what every surface call and the e2e tests
--- want.
---
--- A `Session` is one connection; the module keeps a default singleton for the
--- plugin and lets tests open isolated sessions (`new_session`) so an assertion
--- never shares state with the buffer under test.
local uv = vim.uv or vim.loop
local Session = {}
Session.__index = Session
--- Create an unconnected session bound to `socket_path` (lazy connect).
function Session.new(socket_path)
return setmetatable({
socket_path = socket_path,
pipe = nil,
buf = "", -- partial-line accumulator
pending = {}, -- [id] = { done, result, err }
next_id = 0,
connected = false,
}, Session)
end
--- Drain complete `\n`-terminated lines out of the read buffer. Runs in the
--- libuv fast-event context: string/table ops only, never `vim.api`/`vim.fn`.
function Session:_on_bytes(chunk)
self.buf = self.buf .. chunk
while true do
local nl = self.buf:find("\n", 1, true)
if not nl then
break
end
local line = self.buf:sub(1, nl - 1)
self.buf = self.buf:sub(nl + 1)
if #line > 0 then
self:_dispatch(line)
end
end
end
--- Match one response line to its pending call by id. A line with no id is a
--- server notification (tech-spec §6, slice 11d) — ignored for now.
function Session:_dispatch(line)
-- `luanil` decodes JSON null to Lua nil (not the vim.NIL sentinel), so a
-- `null` result / nullable field reads as a plain absent value.
local ok, msg = pcall(vim.json.decode, line, { luanil = { object = true, array = true } })
if not ok or type(msg) ~= "table" or msg.id == nil then
return
end
local slot = self.pending[msg.id]
if not slot then
return
end
if msg.error ~= nil then
slot.err = string.format("rpc error %s: %s", tostring(msg.error.code), tostring(msg.error.message))
else
slot.result = msg.result
end
slot.done = true
end
--- Fail every outstanding call so blocked `vim.wait`s unblock immediately
--- rather than each waiting out its full timeout. Safe to call from the read
--- callback (fast-event context): only touches tables and `vim.schedule`.
function Session:_fail_all(reason)
self.connected = false
for _, slot in pairs(self.pending) do
if not slot.done then
slot.err = reason
slot.done = true
end
end
local pipe = self.pipe
self.pipe = nil
if pipe then
vim.schedule(function()
pcall(function()
pipe:close()
end)
end)
end
end
--- Connect (idempotent). Blocks until the connect callback fires.
function Session:_ensure()
if self.connected then
return
end
assert(self.socket_path, "heph: no socket configured (call require('heph').setup{ socket = ... })")
local pipe = uv.new_pipe(false)
local done, cerr = false, nil
pipe:connect(self.socket_path, function(e)
cerr = e
done = true
end)
if not vim.wait(5000, function()
return done
end, 10) then
pcall(function()
pipe:close()
end)
error("heph: timed out connecting to hephd at " .. self.socket_path)
end
if cerr then
pcall(function()
pipe:close()
end)
error("heph: cannot connect to hephd at " .. self.socket_path .. ": " .. cerr)
end
self.pipe = pipe
self.buf = ""
self.connected = true
pipe:read_start(function(rerr, chunk)
if rerr then
self:_fail_all("connection error: " .. rerr)
elseif chunk == nil then
self:_fail_all("hephd closed the connection")
else
self:_on_bytes(chunk)
end
end)
end
--- Call `method` with `params`, blocking until the response. Raises a Lua error
--- on an rpc error or timeout. `opts.timeout` defaults to 5000ms.
function Session:call(method, params, opts)
opts = opts or {}
self:_ensure()
self.next_id = self.next_id + 1
local id = self.next_id
local slot = { done = false }
self.pending[id] = slot
-- Empty params must serialize as `{}`, not `[]` (the daemon parses an object).
if params == nil or (type(params) == "table" and vim.tbl_isempty(params)) then
params = vim.empty_dict()
end
local line = vim.json.encode({ id = id, method = method, params = params }) .. "\n"
self.pipe:write(line)
local ok = vim.wait(opts.timeout or 5000, function()
return slot.done
end, 5)
self.pending[id] = nil
if not ok then
error("heph: rpc timeout calling " .. method)
end
if slot.err then
error("heph: " .. slot.err)
end
return slot.result
end
--- Close the connection, failing any in-flight calls.
function Session:close()
self:_fail_all("connection closed")
end
local M = { Session = Session }
--- (Re)bind the default singleton session to `socket_path`.
function M.setup(socket_path)
if M._default then
M._default:close()
end
M._default = Session.new(socket_path)
return M._default
end
--- The default singleton session (created unconnected if absent).
function M.session()
if not M._default then
M._default = Session.new(nil)
end
return M._default
end
--- Register a hook that (re)ensures the daemon — called once to self-heal a
--- dropped connection before a single retry. `nil` disables self-heal (used when
--- autostart is off, so a connect-only setup fails loudly instead of respawning).
function M.set_respawn(fn)
M._respawn = fn
end
local function is_connection_error(msg)
msg = tostring(msg)
return msg:find("connect", 1, true) ~= nil
or msg:find("connection", 1, true) ~= nil
or msg:find("timeout", 1, true) ~= nil
end
--- Blocking call on the default session. If the call fails because the
--- connection is dead and a respawn hook is set, ensure the daemon and retry
--- once (the prior owner releases the DB lock on exit, so a respawn can claim it).
function M.call(method, params, opts)
local ok, result = pcall(M.session().call, M.session(), method, params, opts)
if ok then
return result
end
if M._respawn and is_connection_error(result) then
pcall(M._respawn)
M.session():close() -- drop the dead connection so the retry reconnects
return M.session():call(method, params, opts)
end
error(result)
end
--- An isolated session for a socket — used by tests for independent assertions.
function M.new_session(socket_path)
return Session.new(socket_path)
end
--- Close the default session.
function M.close()
if M._default then
M._default:close()
end
end
return M