hephaestus/heph.nvim/lua/heph/rpc.lua

219 lines
6.3 KiB
Lua
Raw Normal View History

--- Line-delimited JSON-RPC client over hephd's unix socket (tech-spec §6).
---
--- The daemon speaks one JSON object per line: a request `{id, method, params}`
--- gets exactly one response line `{id, result}` xor `{id, error}`. We talk to
--- it over a libuv pipe and expose a **blocking** `call()` by pumping the event
--- loop with `vim.wait` until the matching id returns — synchronous ergonomics
--- over an async transport, which is what every surface call and the e2e tests
--- want.
---
--- A `Session` is one connection; the module keeps a default singleton for the
--- plugin and lets tests open isolated sessions (`new_session`) so an assertion
--- never shares state with the buffer under test.
local uv = vim.uv or vim.loop
local Session = {}
Session.__index = Session
--- Create an unconnected session bound to `socket_path` (lazy connect).
function Session.new(socket_path)
return setmetatable({
socket_path = socket_path,
pipe = nil,
buf = "", -- partial-line accumulator
pending = {}, -- [id] = { done, result, err }
next_id = 0,
connected = false,
}, Session)
end
--- Drain complete `\n`-terminated lines out of the read buffer. Runs in the
--- libuv fast-event context: string/table ops only, never `vim.api`/`vim.fn`.
function Session:_on_bytes(chunk)
self.buf = self.buf .. chunk
while true do
local nl = self.buf:find("\n", 1, true)
if not nl then
break
end
local line = self.buf:sub(1, nl - 1)
self.buf = self.buf:sub(nl + 1)
if #line > 0 then
self:_dispatch(line)
end
end
end
--- Match one response line to its pending call by id. A line with no id is a
--- server notification (tech-spec §6, slice 11d) — ignored for now.
function Session:_dispatch(line)
-- `luanil` decodes JSON null to Lua nil (not the vim.NIL sentinel), so a
-- `null` result / nullable field reads as a plain absent value.
local ok, msg = pcall(vim.json.decode, line, { luanil = { object = true, array = true } })
if not ok or type(msg) ~= "table" or msg.id == nil then
return
end
local slot = self.pending[msg.id]
if not slot then
return
end
if msg.error ~= nil then
slot.err = string.format("rpc error %s: %s", tostring(msg.error.code), tostring(msg.error.message))
else
slot.result = msg.result
end
slot.done = true
end
--- Fail every outstanding call so blocked `vim.wait`s unblock immediately
--- rather than each waiting out its full timeout. Safe to call from the read
--- callback (fast-event context): only touches tables and `vim.schedule`.
function Session:_fail_all(reason)
self.connected = false
for _, slot in pairs(self.pending) do
if not slot.done then
slot.err = reason
slot.done = true
end
end
local pipe = self.pipe
self.pipe = nil
if pipe then
vim.schedule(function()
pcall(function()
pipe:close()
end)
end)
end
end
--- Connect (idempotent). Blocks until the connect callback fires.
function Session:_ensure()
if self.connected then
return
end
assert(self.socket_path, "heph: no socket configured (call require('heph').setup{ socket = ... })")
local pipe = uv.new_pipe(false)
local done, cerr = false, nil
pipe:connect(self.socket_path, function(e)
cerr = e
done = true
end)
if not vim.wait(5000, function()
return done
end, 10) then
pcall(function()
pipe:close()
end)
error("heph: timed out connecting to hephd at " .. self.socket_path)
end
if cerr then
pcall(function()
pipe:close()
end)
error("heph: cannot connect to hephd at " .. self.socket_path .. ": " .. cerr)
end
self.pipe = pipe
self.buf = ""
self.connected = true
pipe:read_start(function(rerr, chunk)
if rerr then
self:_fail_all("connection error: " .. rerr)
elseif chunk == nil then
self:_fail_all("hephd closed the connection")
else
self:_on_bytes(chunk)
end
end)
end
--- Call `method` with `params`, blocking until the response. Raises a Lua error
--- on an rpc error or timeout. `opts.timeout` defaults to 5000ms.
function Session:call(method, params, opts)
opts = opts or {}
self:_ensure()
self.next_id = self.next_id + 1
local id = self.next_id
local slot = { done = false }
self.pending[id] = slot
-- Empty params must serialize as `{}`, not `[]` (the daemon parses an object).
if params == nil or (type(params) == "table" and vim.tbl_isempty(params)) then
params = vim.empty_dict()
end
local line = vim.json.encode({ id = id, method = method, params = params }) .. "\n"
self.pipe:write(line)
local ok = vim.wait(opts.timeout or 5000, function()
return slot.done
end, 5)
self.pending[id] = nil
if not ok then
error("heph: rpc timeout calling " .. method)
end
if slot.err then
error("heph: " .. slot.err)
end
return slot.result
end
--- Close the connection, failing any in-flight calls.
function Session:close()
self:_fail_all("connection closed")
end
local M = { Session = Session }
--- (Re)bind the default singleton session to `socket_path`.
function M.setup(socket_path)
if M._default then
M._default:close()
end
M._default = Session.new(socket_path)
return M._default
end
--- The default singleton session (created unconnected if absent).
function M.session()
if not M._default then
M._default = Session.new(nil)
end
return M._default
end
local function is_connection_error(msg)
msg = tostring(msg)
return msg:find("connect", 1, true) ~= nil
or msg:find("connection", 1, true) ~= nil
or msg:find("timeout", 1, true) ~= nil
end
--- Blocking call on the default session. The plugin is connect-only: on a
--- dropped connection we drop the dead session and **reconnect once** (e.g. the
--- daemon was restarted via `heph daemon restart`) — we never spawn a daemon.
function M.call(method, params, opts)
local ok, result = pcall(M.session().call, M.session(), method, params, opts)
if ok then
return result
end
if is_connection_error(result) then
M.session():close() -- drop the dead connection so the retry reconnects
return M.session():call(method, params, opts)
end
error(result)
end
--- An isolated session for a socket — used by tests for independent assertions.
function M.new_session(socket_path)
return Session.new(socket_path)
end
--- Close the default session.
function M.close()
if M._default then
M._default:close()
end
end
return M