//! SQLite-backed [`Store`] implementation. //! //! `LocalStore` opens a SQLite file directly. The exclusive-lock handoff of //! tech-spec §3.1 is layered on by `hephd` when it owns the file; the store //! itself stays a thin, synchronous SQLite wrapper so it is trivially testable //! against an in-memory database. //! //! The query logic lives in focused submodules ([`nodes`], [`tasks`], [`links`]) //! as free functions over a `&Connection`; the [`Store`] impl here is a thin //! delegating layer so a transaction can span several of them. mod apply; mod exporter; mod links; mod log; mod migrations; mod nodes; mod ops; mod syncstate; mod tags; mod tasks; pub use migrations::latest_version; use std::path::Path; use rusqlite::{Connection, OptionalExtension}; use ulid::Ulid; use crate::clock::Clock; use crate::error::{Error, Result}; use crate::filter::ListFilter; use crate::hlc::Hlc; use crate::model::{ Attention, Conflict, Health, Link, LinkType, NewNode, NewTask, Node, NodeKind, SchedulePatch, SyncCursors, Task, TaskState, }; use crate::oplog::Op; use crate::ranking::RankedTask; use crate::store::Store; /// A SQLite file (or in-memory database) opened directly as a backend. pub struct LocalStore { conn: Connection, owner_id: String, clock: Box, } impl LocalStore { /// Open (creating if needed) a SQLite database at `path`. pub fn open(path: impl AsRef, clock: Box) -> Result { let conn = Connection::open(path)?; Self::init(conn, clock) } /// Open a throwaway in-memory database — for tests. pub fn open_in_memory(clock: Box) -> Result { let conn = Connection::open_in_memory()?; Self::init(conn, clock) } fn init(conn: Connection, clock: Box) -> Result { conn.execute_batch("PRAGMA foreign_keys = ON;")?; migrations::migrate(&conn)?; ensure_origin(&conn)?; let owner_id = ensure_local_user(&conn, clock.as_ref())?; Ok(LocalStore { conn, owner_id, clock, }) } /// The id of the user whose data this store reads/writes. /// /// For a local-only instance this is the single generated local user /// (`oidc_sub = NULL`, tech-spec §13). pub fn owner_id(&self) -> &str { &self.owner_id } /// This device's stable `origin` id (HLC/sync identity). pub fn origin(&self) -> Result { meta_get(&self.conn, "origin")? .ok_or_else(|| Error::Integrity("missing device origin".into())) } } /// A fresh ULID, as a string id. pub(crate) fn new_id() -> String { Ulid::new().to_string() } /// Read a `meta` value. pub(super) fn meta_get(conn: &Connection, key: &str) -> Result> { Ok(conn .query_row("SELECT value FROM meta WHERE key = ?1", [key], |r| r.get(0)) .optional()?) } /// Upsert a `meta` value. pub(super) fn meta_set(conn: &Connection, key: &str, value: &str) -> Result<()> { conn.execute( "INSERT INTO meta (key, value) VALUES (?1, ?2) ON CONFLICT(key) DO UPDATE SET value = excluded.value", (key, value), )?; Ok(()) } /// Advance the persisted clock past a remote `hlc` (encoded), so future local /// stamps exceed everything we've seen (tech-spec §12). Encoded HLCs sort by /// causal order, so a string compare suffices. pub(super) fn absorb_remote_hlc(conn: &Connection, remote: &str) -> Result<()> { let bump = match meta_get(conn, "last_hlc")? { Some(last) => remote > last.as_str(), None => true, }; if bump { meta_set(conn, "last_hlc", remote)?; } Ok(()) } /// Ensure this store has a stable device `origin` id. fn ensure_origin(conn: &Connection) -> Result<()> { if meta_get(conn, "origin")?.is_none() { meta_set(conn, "origin", &new_id())?; } Ok(()) } /// Generate the next local [`Hlc`] (encoded), advancing the persisted clock in /// `meta`. Strictly greater than every previously-generated stamp; the /// read-modify-write runs inside the caller's connection/transaction and the /// store is single-writer, so there is no race. pub(super) fn next_hlc(conn: &Connection, now_ms: i64) -> Result { let origin = meta_get(conn, "origin")? .ok_or_else(|| Error::Integrity("missing device origin".into()))?; let last = match meta_get(conn, "last_hlc")? { Some(s) => Hlc::parse(&s)?, None => Hlc::zero(&origin), }; let physical = now_ms.max(last.physical); let counter = if physical == last.physical { last.counter + 1 } else { 0 }; let encoded = Hlc { physical, counter, origin, } .encode(); meta_set(conn, "last_hlc", &encoded)?; Ok(encoded) } /// Ensure a single local user exists, returning its id. fn ensure_local_user(conn: &Connection, clock: &dyn Clock) -> Result { if let Some(id) = conn .query_row( "SELECT id FROM users ORDER BY created_at LIMIT 1", [], |r| r.get::<_, String>(0), ) .optional()? { return Ok(id); } let id = new_id(); conn.execute( "INSERT INTO users (id, oidc_sub, name, created_at) VALUES (?1, NULL, 'local', ?2)", (&id, clock.now_ms()), )?; Ok(id) } impl Store for LocalStore { fn create_node(&mut self, input: NewNode) -> Result { let now = self.clock.now_ms(); nodes::create(&self.conn, &self.owner_id, now, input) } fn get_node(&self, id: &str) -> Result> { nodes::get(&self.conn, id) } fn update_node( &mut self, id: &str, title: Option, body: Option, ) -> Result { let now = self.clock.now_ms(); nodes::update(&mut self.conn, &self.owner_id, now, id, title, body) } fn tombstone_node(&mut self, id: &str) -> Result<()> { let now = self.clock.now_ms(); nodes::tombstone(&self.conn, &self.owner_id, now, id) } fn resolve_node(&self, title: &str) -> Result> { match links::resolve_id(&self.conn, &self.owner_id, title)? { Some(id) => nodes::get(&self.conn, &id), None => Ok(None), } } fn create_task(&mut self, input: NewTask) -> Result { let now = self.clock.now_ms(); tasks::create(&mut self.conn, &self.owner_id, now, input) } fn get_task(&self, node_id: &str) -> Result> { tasks::get(&self.conn, node_id) } fn set_task_state(&mut self, node_id: &str, state: TaskState) -> Result { let now = self.clock.now_ms(); tasks::set_state(&mut self.conn, &self.owner_id, now, node_id, state) } fn skip_recurrence(&mut self, node_id: &str) -> Result { let now = self.clock.now_ms(); tasks::skip(&self.conn, &self.owner_id, now, node_id) } fn set_task_attention(&mut self, node_id: &str, attention: Attention) -> Result { let now = self.clock.now_ms(); tasks::set_attention(&self.conn, &self.owner_id, now, node_id, attention) } fn set_task_schedule(&mut self, node_id: &str, patch: SchedulePatch) -> Result { let now = self.clock.now_ms(); tasks::set_schedule(&self.conn, &self.owner_id, now, node_id, patch) } fn set_task_project(&mut self, node_id: &str, project_id: Option<&str>) -> Result { let now = self.clock.now_ms(); tasks::set_project(&mut self.conn, &self.owner_id, now, node_id, project_id) } fn delete_project(&mut self, project_id: &str) -> Result<()> { let now = self.clock.now_ms(); tasks::delete_project(&mut self.conn, &self.owner_id, now, project_id) } fn promote( &mut self, container_id: &str, item_ref: usize, attention: Option, project_id: Option, ) -> Result { let now = self.clock.now_ms(); tasks::promote( &mut self.conn, &self.owner_id, now, container_id, item_ref, attention, project_id, ) } fn next(&self, scope: Option<&str>, limit: usize) -> Result> { let now = self.clock.now_ms(); tasks::next(&self.conn, &self.owner_id, now, scope, limit) } fn list(&self, filter: &ListFilter) -> Result> { let now = self.clock.now_ms(); tasks::list(&self.conn, &self.owner_id, now, filter) } fn view(&self, name: &str) -> Result> { let now = self.clock.now_ms(); tasks::view(&self.conn, &self.owner_id, now, name) } fn project_scope(&self, name: &str) -> Result> { tasks::project_scope(&self.conn, &self.owner_id, name) } fn resolve_project(&self, name: &str) -> Result> { match links::resolve_project_id(&self.conn, &self.owner_id, name)? { Some(id) => nodes::get(&self.conn, &id), None => Ok(None), } } fn health(&self) -> Result { tasks::health(&self.conn, &self.owner_id) } fn search(&self, query: &str) -> Result> { nodes::search(&self.conn, &self.owner_id, query) } fn list_nodes(&self, kind: Option) -> Result> { nodes::list(&self.conn, &self.owner_id, kind) } fn list_linkable_nodes(&self) -> Result> { nodes::list_linkable(&self.conn, &self.owner_id) } fn journal_open_or_create(&mut self, date: &str) -> Result { let now = self.clock.now_ms(); nodes::open_or_create_journal(&self.conn, &self.owner_id, now, date) } fn add_link(&mut self, src_id: &str, dst_id: &str, link_type: LinkType) -> Result { let now = self.clock.now_ms(); links::add(&self.conn, &self.owner_id, now, src_id, dst_id, link_type) } fn outgoing_links(&self, id: &str) -> Result> { links::outgoing(&self.conn, id) } fn backlinks(&self, id: &str) -> Result> { links::backlinks(&self.conn, id) } fn add_tag(&mut self, node_id: &str, tag: &str) -> Result { let now = self.clock.now_ms(); tags::add(&mut self.conn, &self.owner_id, now, node_id, tag) } fn remove_tag(&mut self, node_id: &str, tag: &str) -> Result<()> { let now = self.clock.now_ms(); tags::remove(&mut self.conn, &self.owner_id, now, node_id, tag) } fn tags_of(&self, node_id: &str) -> Result> { tags::of(&self.conn, node_id) } fn migrate_wikilinks_to_ids(&mut self) -> Result { let owner = self.owner_id.clone(); let candidates: Vec<(String, String)> = { let mut stmt = self.conn.prepare( "SELECT id, body FROM nodes WHERE owner_id = ?1 AND tombstoned = 0 AND body IS NOT NULL", )?; let rows = stmt.query_map([&owner], |r| Ok((r.get(0)?, r.get(1)?)))?; rows.collect::>>()? }; let mut changed = 0; for (id, body) in candidates { let new_body = crate::wikilink::to_ids(&body, |t| { links::resolve_id(&self.conn, &owner, t).ok().flatten() }); if new_body != body { self.update_node(&id, None, Some(new_body))?; changed += 1; } } Ok(changed) } fn log_append(&mut self, task_id: &str, text: &str) -> Result<()> { let now = self.clock.now_ms(); let tx = self.conn.transaction()?; log::append(&tx, &self.owner_id, now, task_id, text)?; tx.commit()?; Ok(()) } fn log_tail(&self, task_id: &str, n: usize) -> Result> { log::tail(&self.conn, task_id, n) } fn export(&self, dir: &std::path::Path) -> Result { exporter::export(&self.conn, &self.owner_id, dir) } fn ops_since(&self, after: Option<&str>) -> Result> { ops::since(&self.conn, &self.owner_id, after) } fn apply_op(&mut self, op: &Op) -> Result { apply::apply(&mut self.conn, op) } fn adopt_owner(&mut self, canonical: &str) -> Result<()> { if self.owner_id == canonical { return Ok(()); } let old = self.owner_id.clone(); let tx = self.conn.transaction()?; tx.execute( "INSERT OR IGNORE INTO users (id, oidc_sub, name, created_at) VALUES (?1, NULL, 'user', 0)", [canonical], )?; tx.execute( "UPDATE nodes SET owner_id = ?1 WHERE owner_id = ?2", (canonical, &old), )?; tx.execute( "UPDATE oplog SET owner_id = ?1 WHERE owner_id = ?2", (canonical, &old), )?; tx.execute( "UPDATE conflicts SET owner_id = ?1 WHERE owner_id = ?2", (canonical, &old), )?; tx.execute("DELETE FROM users WHERE id = ?1", [&old])?; tx.commit()?; self.owner_id = canonical.to_string(); Ok(()) } fn sync_state(&self, peer: &str) -> Result { syncstate::get(&self.conn, peer) } fn record_sync( &mut self, peer: &str, pushed: Option<&str>, pulled: Option<&str>, ) -> Result<()> { let now = self.clock.now_ms(); syncstate::record(&self.conn, peer, pushed, pulled, now) } fn resolve_owner(&mut self, sub: &str) -> Result> { // The owner's bound identity (NULL until first authenticated sync). let current: Option = self .conn .query_row( "SELECT oidc_sub FROM users WHERE id = ?1", [&self.owner_id], |r| r.get(0), ) .optional()? .flatten(); match current { None => { // Claim-on-first: bind this sub to the store's owner. self.conn.execute( "UPDATE users SET oidc_sub = ?1 WHERE id = ?2", (sub, &self.owner_id), )?; Ok(Some(self.owner_id.clone())) } Some(existing) if existing == sub => Ok(Some(self.owner_id.clone())), Some(_) => Ok(None), } } fn conflicts_list(&self) -> Result> { apply::list_conflicts(&self.conn, &self.owner_id) } fn conflicts_resolve(&mut self, id: &str, choice: &str) -> Result<()> { apply::resolve_conflict(&self.conn, id, choice) } } #[cfg(test)] mod tests { use super::*; use crate::clock::FixedClock; fn store_at(now_ms: i64) -> LocalStore { LocalStore::open_in_memory(Box::new(FixedClock(now_ms))).expect("open in-memory store") } #[test] fn migrations_bring_schema_to_latest() { let store = store_at(0); let v: i64 = store .conn .query_row("PRAGMA user_version", [], |r| r.get(0)) .unwrap(); assert_eq!(v, latest_version()); } #[test] fn project_scope_resolves_by_name_and_errors_on_unknown() { use crate::model::{NewNode, NodeKind}; let mut store = store_at(1); let p = store .create_node(NewNode { kind: NodeKind::Project, title: "Garden".into(), body: None, }) .unwrap(); assert_eq!(store.project_scope("Garden").unwrap(), vec![p.id]); assert!(store.project_scope("Nope").is_err()); } #[test] fn resolve_project_is_fuzzy_only_when_unambiguous() { use crate::model::{NewNode, NodeKind}; let mut store = store_at(1); let mk = |store: &mut LocalStore, title: &str| { store .create_node(NewNode { kind: NodeKind::Project, title: title.into(), body: None, }) .unwrap() .id }; let hephaestus = mk(&mut store, "Hephaestus"); let garden = mk(&mut store, "Garden"); let id = |o: Option| o.map(|n| n.id); // Exact (case-sensitive) wins. assert_eq!( id(store.resolve_project("Hephaestus").unwrap()), Some(hephaestus.clone()) ); // Case-insensitive exact. assert_eq!( id(store.resolve_project("hephaestus").unwrap()), Some(hephaestus.clone()) ); // Unambiguous prefix. assert_eq!( id(store.resolve_project("heph").unwrap()), Some(hephaestus.clone()) ); assert_eq!(id(store.resolve_project("gar").unwrap()), Some(garden)); // No match at all. assert_eq!(id(store.resolve_project("nope").unwrap()), None); // An exact (case-sensitive) title still wins even when it is a prefix of // another project — exactness beats fuzziness, never ambiguous. let work = mk(&mut store, "Work"); let _workshop = mk(&mut store, "Workshop"); assert_eq!(id(store.resolve_project("Work").unwrap()), Some(work)); // …but an ambiguous prefix with no exact match resolves to nothing. assert_eq!(id(store.resolve_project("Wor").unwrap()), None); } #[test] fn delete_project_unfiles_its_tasks_then_tombstones_it() { use crate::filter::ListFilter; use crate::model::{NewNode, NewTask, NodeKind}; let mut store = store_at(1); let proj = store .create_node(NewNode { kind: NodeKind::Project, title: "Garden".into(), body: None, }) .unwrap(); let task = store .create_task(NewTask { title: "Weed the beds".into(), attention: None, do_date: None, late_on: None, recurrence: None, project_id: None, }) .unwrap(); store .set_task_project(&task.node_id, Some(&proj.id)) .unwrap(); // Filed under the project before deletion. let filed = store .list(&ListFilter { scope: vec![proj.id.clone()], ..Default::default() }) .unwrap(); assert_eq!(filed.len(), 1); store.delete_project(&proj.id).unwrap(); // The project node is tombstoned… let ts: i64 = store .conn .query_row( "SELECT tombstoned FROM nodes WHERE id=?1", [&proj.id], |r| r.get(0), ) .unwrap(); assert_eq!(ts, 1); // …and the task survives, now unfiled (it shows in the Inbox), not orphaned. let inbox = store .list(&ListFilter { unfiled: true, ..Default::default() }) .unwrap(); assert!(inbox.iter().any(|t| t.node_id == task.node_id)); } #[test] fn resolve_node_matches_exact_title_not_fuzzy() { use crate::model::NewNode; let mut store = store_at(1); let roof = store.create_node(NewNode::doc("Roof", "shingles")).unwrap(); // A fuzzy/FTS match would surface this for "Roof"; exact resolve must not. store .create_node(NewNode::doc("Roofing options", "estimates")) .unwrap(); let got = store.resolve_node("Roof").unwrap().expect("exact title"); assert_eq!(got.id, roof.id); // A prefix is not an exact title — resolves to nothing, never the // fuzzy neighbour. assert!(store.resolve_node("Roo").unwrap().is_none()); // Tombstoned nodes are excluded. store.tombstone_node(&roof.id).unwrap(); assert!(store.resolve_node("Roof").unwrap().is_none()); } #[test] fn opening_twice_is_idempotent_for_the_local_user() { let conn = Connection::open_in_memory().unwrap(); conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap(); migrations::migrate(&conn).unwrap(); let a = ensure_local_user(&conn, &FixedClock(1)).unwrap(); let b = ensure_local_user(&conn, &FixedClock(2)).unwrap(); assert_eq!(a, b); } }