hephaestus/crates/hephd/src/remote.rs
Erich Blume dd5ef7dc63
Some checks failed
Build / validate (pull_request) Failing after 11s
fix: deleting a project unfiles its tasks to the Inbox (§8.1/§8.2)
Project delete previously tombstoned only the project node, leaving its
tasks with a live in-project link to a dead project — orphaned (not in the
Inbox, unbrowsable, blank project) rather than unfiled as intended. New
atomic Store::delete_project tombstones every in-project link to the project
(tasks fall to the Inbox), then tombstones the project node; tasks are never
deleted. Exposed as the project.delete RPC (LocalStore + RemoteStore); the
heph-tui sidebar `D` now routes through it. Core test asserts the task
survives and becomes unfiled; the project node is tombstoned.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 19:15:54 -07:00

329 lines
11 KiB
Rust

//! `RemoteStore` — the `client`-mode backend (tech-spec §3.1).
//!
//! A `client` keeps **no local replica**: it proxies every [`Store`] call to a
//! `server`'s `POST /rpc` endpoint (the same [`crate::rpc::dispatch`] the unix
//! socket runs). Since `Store` is synchronous, this uses a **blocking** reqwest
//! client; the daemon only ever calls store methods from its blocking pool, so
//! that is safe.
//!
//! With no op-log of its own, the sync primitives (`ops_since`/`apply_op`/
//! `sync_state`/`record_sync`) are not meaningful here — a `client` never
//! background-syncs, it reads and writes the hub live. They are stubbed
//! accordingly; the daemon never invokes them in this mode.
use std::sync::Arc;
use serde::de::DeserializeOwned;
use serde_json::{json, Value};
use heph_core::{
Attention, Conflict, Error, Health, Link, LinkType, ListFilter, NewNode, NewTask, Node,
NodeKind, Result, SchedulePatch, Store, SyncCursors, Task, TaskState,
};
use crate::oauth::{self, TokenStore};
use crate::rpc::{Response, NOT_FOUND};
/// How a client obtains the bearer token it presents to the server.
struct AuthCtx {
tokens: Arc<dyn TokenStore>,
issuer: String,
client_id: String,
}
/// A no-replica store that proxies to a `server` over HTTP.
pub struct RemoteStore {
base: String,
http: ureq::Agent,
auth: Option<AuthCtx>,
}
impl RemoteStore {
/// Point a client at `server_url` (e.g. `http://hub.example:8787`),
/// unauthenticated.
pub fn new(server_url: &str) -> RemoteStore {
RemoteStore {
base: server_url.trim_end_matches('/').to_string(),
http: crate::blocking_agent(),
auth: None,
}
}
/// Point a client at `server_url`, attaching a cached OIDC bearer token
/// (refreshed as needed) from `tokens` to every call.
pub fn with_auth(
server_url: &str,
tokens: Arc<dyn TokenStore>,
issuer: String,
client_id: String,
) -> RemoteStore {
RemoteStore {
auth: Some(AuthCtx {
tokens,
issuer,
client_id,
}),
..RemoteStore::new(server_url)
}
}
/// Issue one `/rpc` call, returning the raw `result` value.
fn call(&self, method: &str, params: Value) -> Result<Value> {
let mut request = self.http.post(format!("{}/rpc", self.base));
if let Some(auth) = &self.auth {
let bearer = oauth::current_bearer(auth.tokens.as_ref(), &auth.issuer, &auth.client_id)
.map_err(|e| Error::Remote(e.to_string()))?;
if let Some(bearer) = bearer {
request = request.header("Authorization", format!("Bearer {bearer}"));
}
}
let mut http_response = request
.send_json(json!({ "method": method, "params": params }))
.map_err(|e| Error::Remote(e.to_string()))?;
if !http_response.status().is_success() {
return Err(Error::Remote(format!(
"server returned {}",
http_response.status()
)));
}
let response: Response = http_response
.body_mut()
.read_json()
.map_err(|e| Error::Remote(e.to_string()))?;
if let Some(err) = response.error {
// Preserve "not found" so callers keep the typed contract.
return Err(if err.code == NOT_FOUND {
Error::NodeNotFound(err.message)
} else {
Error::Remote(err.message)
});
}
Ok(response.result.unwrap_or(Value::Null))
}
/// Call `method` and decode the result into `T`.
fn call_as<T: DeserializeOwned>(&self, method: &str, params: Value) -> Result<T> {
let value = self.call(method, params)?;
serde_json::from_value(value).map_err(|e| Error::Remote(e.to_string()))
}
}
impl Store for RemoteStore {
fn create_node(&mut self, input: NewNode) -> Result<Node> {
self.call_as("node.create", json!(input))
}
fn get_node(&self, id: &str) -> Result<Option<Node>> {
self.call_as("node.get", json!({ "id": id }))
}
fn update_node(
&mut self,
id: &str,
title: Option<String>,
body: Option<String>,
) -> Result<Node> {
self.call_as(
"node.update",
json!({ "id": id, "title": title, "body": body }),
)
}
fn tombstone_node(&mut self, id: &str) -> Result<()> {
self.call("node.tombstone", json!({ "id": id })).map(|_| ())
}
fn delete_project(&mut self, project_id: &str) -> Result<()> {
self.call("project.delete", json!({ "id": project_id }))
.map(|_| ())
}
fn resolve_node(&self, title: &str) -> Result<Option<Node>> {
self.call_as("node.resolve", json!({ "title": title }))
}
fn create_task(&mut self, input: NewTask) -> Result<Task> {
self.call_as("task.create", json!(input))
}
fn get_task(&self, node_id: &str) -> Result<Option<Task>> {
self.call_as("task.get", json!({ "id": node_id }))
}
fn set_task_state(&mut self, node_id: &str, state: TaskState) -> Result<Task> {
self.call_as("task.set_state", json!({ "id": node_id, "state": state }))
}
fn skip_recurrence(&mut self, node_id: &str) -> Result<Task> {
self.call_as("task.skip", json!({ "id": node_id }))
}
fn set_task_attention(&mut self, node_id: &str, attention: Attention) -> Result<Task> {
self.call_as(
"task.set_attention",
json!({ "id": node_id, "attention": attention }),
)
}
fn set_task_schedule(&mut self, node_id: &str, patch: SchedulePatch) -> Result<Task> {
// Serialize the patch (absent fields are skipped), then inject `id`.
let mut params = serde_json::to_value(&patch).expect("SchedulePatch serializes");
params["id"] = json!(node_id);
self.call_as("task.set_schedule", params)
}
fn set_task_project(&mut self, node_id: &str, project_id: Option<&str>) -> Result<Task> {
self.call_as(
"task.set_project",
json!({ "id": node_id, "project_id": project_id }),
)
}
fn promote(
&mut self,
container_id: &str,
item_ref: usize,
attention: Option<Attention>,
project_id: Option<String>,
) -> Result<Task> {
self.call_as(
"task.promote",
json!({
"container_id": container_id,
"item_ref": item_ref,
"attention": attention,
"project": project_id,
}),
)
}
fn next(&self, scope: Option<&str>, limit: usize) -> Result<Vec<heph_core::RankedTask>> {
self.call_as("next", json!({ "scope": scope, "limit": limit }))
}
fn list(&self, filter: &ListFilter) -> Result<Vec<heph_core::RankedTask>> {
self.call_as("list", json!(filter))
}
fn view(&self, name: &str) -> Result<Vec<heph_core::RankedTask>> {
self.call_as("view", json!({ "name": name }))
}
fn health(&self) -> Result<Health> {
self.call_as("health", json!({}))
}
fn search(&self, query: &str) -> Result<Vec<Node>> {
self.call_as("search", json!({ "query": query }))
}
fn list_nodes(&self, kind: Option<NodeKind>) -> Result<Vec<Node>> {
self.call_as("node.list", json!({ "kind": kind.map(|k| k.as_str()) }))
}
fn list_linkable_nodes(&self) -> Result<Vec<Node>> {
self.call_as("node.linkable", json!({}))
}
fn journal_open_or_create(&mut self, date: &str) -> Result<Node> {
self.call_as("journal.open_or_create", json!({ "date": date }))
}
fn add_link(&mut self, src_id: &str, dst_id: &str, link_type: LinkType) -> Result<Link> {
self.call_as(
"links.add",
json!({ "src": src_id, "dst": dst_id, "link_type": link_type }),
)
}
fn outgoing_links(&self, id: &str) -> Result<Vec<Link>> {
self.call_as("links.outgoing", json!({ "id": id }))
}
fn backlinks(&self, id: &str) -> Result<Vec<Link>> {
self.call_as("links.backlinks", json!({ "id": id }))
}
fn add_tag(&mut self, node_id: &str, tag: &str) -> Result<Node> {
self.call_as("tag.add", json!({ "node_id": node_id, "tag": tag }))
}
fn remove_tag(&mut self, node_id: &str, tag: &str) -> Result<()> {
self.call("tag.remove", json!({ "node_id": node_id, "tag": tag }))
.map(|_| ())
}
fn tags_of(&self, node_id: &str) -> Result<Vec<String>> {
self.call_as("tag.list", json!({ "node_id": node_id }))
}
fn migrate_wikilinks_to_ids(&mut self) -> Result<usize> {
self.call_as("migrate.wikilinks", json!({}))
}
fn log_append(&mut self, task_id: &str, text: &str) -> Result<()> {
self.call("log.append", json!({ "task_id": task_id, "text": text }))
.map(|_| ())
}
fn log_tail(&self, task_id: &str, n: usize) -> Result<Vec<String>> {
self.call_as("log.tail", json!({ "task_id": task_id, "n": n }))
}
fn export(&self, dir: &std::path::Path) -> Result<usize> {
// Export runs server-side, writing under `dir` on the server's host.
let out: Value = self.call("export", json!({ "path": dir.to_string_lossy() }))?;
out.get("count")
.and_then(Value::as_u64)
.map(|n| n as usize)
.ok_or_else(|| Error::Remote("export: missing count".into()))
}
// --- sync primitives: not meaningful for a no-replica client ---
fn ops_since(&self, _after: Option<&str>) -> Result<Vec<heph_core::Op>> {
Err(Error::Remote(
"ops_since is unsupported in client mode".into(),
))
}
fn apply_op(&mut self, _op: &heph_core::Op) -> Result<bool> {
Err(Error::Remote(
"apply_op is unsupported in client mode".into(),
))
}
fn adopt_owner(&mut self, _canonical: &str) -> Result<()> {
// The server owns the data; a client has nothing local to re-own.
Ok(())
}
fn sync_state(&self, _peer: &str) -> Result<SyncCursors> {
Ok(SyncCursors::default())
}
fn record_sync(
&mut self,
_peer: &str,
_pushed: Option<&str>,
_pulled: Option<&str>,
) -> Result<()> {
Ok(())
}
fn authorize_owner_sub(&mut self, _sub: &str) -> Result<bool> {
// Hub-side gate; a no-replica client never hosts an endpoint to guard.
Err(Error::Remote(
"authorize_owner_sub is a hub-side operation".into(),
))
}
fn conflicts_list(&self) -> Result<Vec<Conflict>> {
self.call_as("conflicts.list", json!({}))
}
fn conflicts_resolve(&mut self, id: &str, choice: &str) -> Result<()> {
self.call("conflicts.resolve", json!({ "id": id, "choice": choice }))
.map(|_| ())
}
}