--- Line-delimited JSON-RPC client over hephd's unix socket (tech-spec §6). --- --- The daemon speaks one JSON object per line: a request `{id, method, params}` --- gets exactly one response line `{id, result}` xor `{id, error}`. We talk to --- it over a libuv pipe and expose a **blocking** `call()` by pumping the event --- loop with `vim.wait` until the matching id returns — synchronous ergonomics --- over an async transport, which is what every surface call and the e2e tests --- want. --- --- A `Session` is one connection; the module keeps a default singleton for the --- plugin and lets tests open isolated sessions (`new_session`) so an assertion --- never shares state with the buffer under test. local uv = vim.uv or vim.loop local Session = {} Session.__index = Session --- Create an unconnected session bound to `socket_path` (lazy connect). function Session.new(socket_path) return setmetatable({ socket_path = socket_path, pipe = nil, buf = "", -- partial-line accumulator pending = {}, -- [id] = { done, result, err } next_id = 0, connected = false, }, Session) end --- Drain complete `\n`-terminated lines out of the read buffer. Runs in the --- libuv fast-event context: string/table ops only, never `vim.api`/`vim.fn`. function Session:_on_bytes(chunk) self.buf = self.buf .. chunk while true do local nl = self.buf:find("\n", 1, true) if not nl then break end local line = self.buf:sub(1, nl - 1) self.buf = self.buf:sub(nl + 1) if #line > 0 then self:_dispatch(line) end end end --- Match one response line to its pending call by id. A line with no id is a --- server notification (tech-spec §6, slice 11d) — ignored for now. function Session:_dispatch(line) -- `luanil` decodes JSON null to Lua nil (not the vim.NIL sentinel), so a -- `null` result / nullable field reads as a plain absent value. local ok, msg = pcall(vim.json.decode, line, { luanil = { object = true, array = true } }) if not ok or type(msg) ~= "table" or msg.id == nil then return end local slot = self.pending[msg.id] if not slot then return end if msg.error ~= nil then slot.err = string.format("rpc error %s: %s", tostring(msg.error.code), tostring(msg.error.message)) else slot.result = msg.result end slot.done = true end --- Fail every outstanding call so blocked `vim.wait`s unblock immediately --- rather than each waiting out its full timeout. Safe to call from the read --- callback (fast-event context): only touches tables and `vim.schedule`. function Session:_fail_all(reason) self.connected = false for _, slot in pairs(self.pending) do if not slot.done then slot.err = reason slot.done = true end end local pipe = self.pipe self.pipe = nil if pipe then vim.schedule(function() pcall(function() pipe:close() end) end) end end --- Connect (idempotent). Blocks until the connect callback fires. function Session:_ensure() if self.connected then return end assert(self.socket_path, "heph: no socket configured (call require('heph').setup{ socket = ... })") local pipe = uv.new_pipe(false) local done, cerr = false, nil pipe:connect(self.socket_path, function(e) cerr = e done = true end) if not vim.wait(5000, function() return done end, 10) then pcall(function() pipe:close() end) error("heph: timed out connecting to hephd at " .. self.socket_path) end if cerr then pcall(function() pipe:close() end) error("heph: cannot connect to hephd at " .. self.socket_path .. ": " .. cerr) end self.pipe = pipe self.buf = "" self.connected = true pipe:read_start(function(rerr, chunk) if rerr then self:_fail_all("connection error: " .. rerr) elseif chunk == nil then self:_fail_all("hephd closed the connection") else self:_on_bytes(chunk) end end) end --- Call `method` with `params`, blocking until the response. Raises a Lua error --- on an rpc error or timeout. `opts.timeout` defaults to 5000ms. function Session:call(method, params, opts) opts = opts or {} self:_ensure() self.next_id = self.next_id + 1 local id = self.next_id local slot = { done = false } self.pending[id] = slot -- Empty params must serialize as `{}`, not `[]` (the daemon parses an object). if params == nil or (type(params) == "table" and vim.tbl_isempty(params)) then params = vim.empty_dict() end local line = vim.json.encode({ id = id, method = method, params = params }) .. "\n" self.pipe:write(line) local ok = vim.wait(opts.timeout or 5000, function() return slot.done end, 5) self.pending[id] = nil if not ok then error("heph: rpc timeout calling " .. method) end if slot.err then error("heph: " .. slot.err) end return slot.result end --- Close the connection, failing any in-flight calls. function Session:close() self:_fail_all("connection closed") end local M = { Session = Session } --- (Re)bind the default singleton session to `socket_path`. function M.setup(socket_path) if M._default then M._default:close() end M._default = Session.new(socket_path) return M._default end --- The default singleton session (created unconnected if absent). function M.session() if not M._default then M._default = Session.new(nil) end return M._default end --- Register a hook that (re)ensures the daemon — called once to self-heal a --- dropped connection before a single retry. `nil` disables self-heal (used when --- autostart is off, so a connect-only setup fails loudly instead of respawning). function M.set_respawn(fn) M._respawn = fn end local function is_connection_error(msg) msg = tostring(msg) return msg:find("connect", 1, true) ~= nil or msg:find("connection", 1, true) ~= nil or msg:find("timeout", 1, true) ~= nil end --- Blocking call on the default session. If the call fails because the --- connection is dead and a respawn hook is set, ensure the daemon and retry --- once (the prior owner releases the DB lock on exit, so a respawn can claim it). function M.call(method, params, opts) local ok, result = pcall(M.session().call, M.session(), method, params, opts) if ok then return result end if M._respawn and is_connection_error(result) then pcall(M._respawn) M.session():close() -- drop the dead connection so the retry reconnects return M.session():call(method, params, opts) end error(result) end --- An isolated session for a socket — used by tests for independent assertions. function M.new_session(socket_path) return Session.new(socket_path) end --- Close the default session. function M.close() if M._default then M._default:close() end end return M